boto3를 사용하여 버킷 내용 나열
S3의 버킷 내부를 확인하려면boto3
? (즉, 의"ls"
)?
다음을 수행합니다.
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('some/path/')
반환:
s3.Bucket(name='some/path/')
내용물을 어떻게 볼 수 있습니까?
내용을 표시하는 한 가지 방법은 다음과 같습니다.
for my_bucket_object in my_bucket.objects.all():
print(my_bucket_object)
이는 'ls'와 비슷하지만 접두사 폴더 규칙은 고려하지 않으며 버킷에 개체가 나열됩니다.키 이름의 일부인 접두사를 필터링하는 것은 독자에게 달려 있습니다.
Python 2의 경우:
from boto.s3.connection import S3Connection
conn = S3Connection() # assumes boto.cfg setup
bucket = conn.get_bucket('bucket_name')
for obj in bucket.get_all_keys():
print(obj.key)
Python 3의 경우:
from boto3 import client
conn = client('s3') # again assumes boto.cfg setup, assume AWS S3
for key in conn.list_objects(Bucket='bucket_name')['Contents']:
print(key['Key'])
인증은 별도로 설정했을 것으로 생각됩니다.
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bucket_name')
for file in my_bucket.objects.all():
print(file.key)
s3 유틸리티 기능은 기본적으로 @Hephaestus의 답변에 최적화된 버전입니다.
import boto3
s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')
def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
prefix = prefix[1:] if prefix.startswith(delimiter) else prefix
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
yield content['Key']
테스트(boto3 1.9.84)에서는 동등한(단순한) 코드보다 훨씬 빠릅니다.
import boto3
def keys(bucket_name, prefix='/', delimiter='/'):
prefix = prefix[1:] if prefix.startswith(delimiter) else prefix
bucket = boto3.resource('s3').Bucket(bucket_name)
return (_.key for _ in bucket.objects.filter(Prefix=prefix))
S3는 UTF-8 바이너리 정렬 결과를 보증하므로start_after
최적화가 첫 번째 기능에 추가되었습니다.
큰 키 리스트(디렉토리 리스트가 1000을 넘는 경우)를 처리하기 위해서, 다음의 코드를 사용해 복수의 리스트가 있는 키 값(파일명)을 축적했습니다(위의 아멜리오에 의한 첫 번째 행).코드는 python3:
from boto3 import client
bucket_name = "my_bucket"
prefix = "my_key/sub_key/lots_o_files"
s3_conn = client('s3') # type: BaseClient ## again assumes boto.cfg setup, assume AWS S3
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter = "/")
if 'Contents' not in s3_result:
#print(s3_result)
return []
file_list = []
for key in s3_result['Contents']:
file_list.append(key['Key'])
print(f"List count = {len(file_list)}")
while s3_result['IsTruncated']:
continuation_key = s3_result['NextContinuationToken']
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter="/", ContinuationToken=continuation_key)
for key in s3_result['Contents']:
file_list.append(key['Key'])
print(f"List count = {len(file_list)}")
return file_list
ACCESS 키와 SECRET 키를 건네주는 경우(시큐어하지 않기 때문에, 사용하지 말아 주세요).
from boto3.session import Session
ACCESS_KEY='your_access_key'
SECRET_KEY='your_secret_key'
session = Session(aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY)
s3 = session.resource('s3')
your_bucket = s3.Bucket('your_bucket')
for s3_file in your_bucket.objects.all():
print(s3_file.key)
#To print all filenames in a bucket
import boto3
s3 = boto3.client('s3')
def get_s3_keys(bucket):
"""Get a list of keys in an S3 bucket."""
resp = s3.list_objects_v2(Bucket=bucket)
for obj in resp['Contents']:
files = obj['Key']
return files
filename = get_s3_keys('your_bucket_name')
print(filename)
#To print all filenames in a certain directory in a bucket
import boto3
s3 = boto3.client('s3')
def get_s3_keys(bucket, prefix):
"""Get a list of keys in an S3 bucket."""
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in resp['Contents']:
files = obj['Key']
print(files)
return files
filename = get_s3_keys('your_bucket_name', 'folder_name/sub_folder_name/')
print(filename)
업데이트: 가장 쉬운 방법은 awswrangler를 사용하는 것입니다.
import awswrangler as wr
wr.s3.list_objects('s3://bucket_name')
import boto3
s3 = boto3.resource('s3')
## Bucket to use
my_bucket = s3.Bucket('city-bucket')
## List objects within a given prefix
for obj in my_bucket.objects.filter(Delimiter='/', Prefix='city/'):
print obj.key
출력:
city/pune.csv
city/goa.csv
보다 절약적인 방법으로 for 루프를 통해 반복하는 것이 아니라 S3 버킷 내의 모든 파일을 포함하는 원본 개체를 인쇄할 수도 있습니다.
session = Session(aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
s3 = session.resource('s3')
bucket = s3.Bucket('bucket_name')
files_in_s3 = bucket.objects.all()
#you can print this iterable with print(list(files_in_s3))
오브젝트 요약:
ObjectSummary에는 다음 2개의 식별자가 부가되어 있습니다.
- bucket_name
- 열쇠
개체 키에 대한 자세한 내용은 AWS S3 설명서를 참조하십시오.
오브젝트 키:
개체를 생성할 때 버킷 내의 개체를 고유하게 식별하는 키 이름을 지정합니다.예를 들어 Amazon S3 콘솔(AWS 관리 콘솔 참조)에서 버킷을 강조 표시하면 버킷의 개체 목록이 나타납니다.이러한 이름이 개체 키입니다.키의 이름은 UTF-8 인코딩 길이가 최대 1024바이트인 Unicode 문자 시퀀스입니다.
Amazon S3 데이터 모델은 평평한 구조입니다. 즉, 버킷을 생성하고 버킷에 개체가 저장됩니다.하위 버킷이나 하위 폴더의 계층은 없지만 Amazon S3 콘솔처럼 키 이름 접두사와 구분 기호를 사용하여 논리적 계층을 추론할 수 있습니다.Amazon S3 콘솔은 폴더 개념을 지원합니다.버킷(admin-created)에 다음 개체 키를 가진 개체가 4개 있다고 가정합니다.
개발/프로젝트1xls
재무 / 스테이트먼트 1.pdf
개인/세금 문서.pdf
s3-dg.pdf
레퍼런스:
다음은 버킷 이름과 개체 키를 가져오는 방법을 보여 주는 코드 예입니다.
예를 들어:
import boto3
from pprint import pprint
def main():
def enumerate_s3():
s3 = boto3.resource('s3')
for bucket in s3.buckets.all():
print("Name: {}".format(bucket.name))
print("Creation Date: {}".format(bucket.creation_date))
for object in bucket.objects.all():
print("Object: {}".format(object))
print("Object bucket_name: {}".format(object.bucket_name))
print("Object key: {}".format(object.key))
enumerate_s3()
if __name__ == '__main__':
main()
'아예'에 하는 것을 하시는 건가요?aws s3 ls
boto3에서.그러면 모든 최상위 폴더와 파일이 나열됩니다.이 폴더는 최상위 폴더만 나열되므로 가장 가까운 폴더입니다.그렇게 간단한 수술이 얼마나 어려운지 놀랍다.
import boto3
def s3_ls():
s3 = boto3.resource('s3')
bucket = s3.Bucket('example-bucket')
result = bucket.meta.client.list_objects(Bucket=bucket.name,
Delimiter='/')
for o in result.get('CommonPrefixes'):
print(o.get('Prefix'))
내가 이걸 하는데 사용한 방법 중 하나는:
import boto3
s3 = boto3.resource('s3')
bucket=s3.Bucket("bucket_name")
contents = [_.key for _ in bucket.objects.all() if "subfolders/ifany/" in _.key]
모든 파일의 파일 이름 또는 'json', 'jpg'와 같은 특정 유형의 파일 이름을 반환하는 간단한 함수가 있습니다.
def get_file_list_s3(bucket, prefix="", file_extension=None):
"""Return the list of all file paths (prefix + file name) with certain type or all
Parameters
----------
bucket: str
The name of the bucket. For example, if your bucket is "s3://my_bucket" then it should be "my_bucket"
prefix: str
The full path to the the 'folder' of the files (objects). For example, if your files are in
s3://my_bucket/recipes/deserts then it should be "recipes/deserts". Default : ""
file_extension: str
The type of the files. If you want all, just leave it None. If you only want "json" files then it
should be "json". Default: None
Return
------
file_names: list
The list of file names including the prefix
"""
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
file_objs = my_bucket.objects.filter(Prefix=prefix).all()
file_names = [file_obj.key for file_obj in file_objs if file_extension is not None and file_obj.key.split(".")[-1] == file_extension]
return file_names
인증 방식을 포함하여 다음과 같이 했습니다.
s3_client = boto3.client(
's3',
aws_access_key_id='access_key',
aws_secret_access_key='access_key_secret',
config=boto3.session.Config(signature_version='s3v4'),
region_name='region'
)
response = s3_client.list_objects(Bucket='bucket_name', Prefix=key)
if ('Contents' in response):
# Object / key exists!
return True
else:
# Object / key DOES NOT exist!
return False
여기 해결책이 있습니다.
import boto3
s3=boto3.resource('s3')
BUCKET_NAME = 'Your S3 Bucket Name'
allFiles = s3.Bucket(BUCKET_NAME).objects.all()
for file in allFiles:
print(file.key)
위의 코멘트 중 하나에서 @Hephaeastus의 코드를 거의 수정하지 않고 다음 메서드를 작성하여 폴더와 오브젝트(파일)를 지정된 경로에 나열했습니다.s3 ls 명령어와 같이 동작합니다.
from boto3 import session
def s3_ls(profile=None, bucket_name=None, folder_path=None):
folders=[]
files=[]
result=dict()
bucket_name = bucket_name
prefix= folder_path
session = boto3.Session(profile_name=profile)
s3_conn = session.client('s3')
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Delimiter = "/", Prefix=prefix)
if 'Contents' not in s3_result and 'CommonPrefixes' not in s3_result:
return []
if s3_result.get('CommonPrefixes'):
for folder in s3_result['CommonPrefixes']:
folders.append(folder.get('Prefix'))
if s3_result.get('Contents'):
for key in s3_result['Contents']:
files.append(key['Key'])
while s3_result['IsTruncated']:
continuation_key = s3_result['NextContinuationToken']
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Delimiter="/", ContinuationToken=continuation_key, Prefix=prefix)
if s3_result.get('CommonPrefixes'):
for folder in s3_result['CommonPrefixes']:
folders.append(folder.get('Prefix'))
if s3_result.get('Contents'):
for key in s3_result['Contents']:
files.append(key['Key'])
if folders:
result['folders']=sorted(folders)
if files:
result['files']=sorted(files)
return result
그러면 지정된 경로의 모든 개체/폴더가 나열됩니다.Folder_path는 기본적으로 None으로 둘 수 있으며 메서드는 버킷 루트의 즉시 내용을 나열합니다.
다음과 같이 할 수도 있습니다.
csv_files = s3.list_objects_v2(s3_bucket_path)
for obj in csv_files['Contents']:
key = obj['Key']
사용법
cloudpathlib
, 즉 간이 를 사용할 수 .pathlib
AWS S3('Azure blob', GCS) API.수 있습니다.pip install "cloudpathlib[s3]"
.
또는 를 사용하여 디렉토리의 내용을 나열할 수 있습니다.
다음은 복사하여 실행할 수 있는 공용 AWS S3 버킷의 예입니다.
from cloudpathlib import CloudPath
s3_path = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349")
# list items with glob
list(
s3_path.glob("*")
)[:3]
#> [ S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0001_5a63d42e-27c6-448a-84f1-bfc632125b8e.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0002_a89f1b79-786f-4dac-9dcc-609fb1a977b1.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0003_02c30af6-911e-4e01-8c24-7644da2b8672.jpg')]
# list items with iterdir
list(
s3_path.iterdir()
)[:3]
#> [ S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0001_5a63d42e-27c6-448a-84f1-bfc632125b8e.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0002_a89f1b79-786f-4dac-9dcc-609fb1a977b1.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0003_02c30af6-911e-4e01-8c24-7644da2b8672.jpg')]
2021-05-21 20:38:47 PDT에 reprexlite v0.4.2에 의해 작성
lambda 함수에서 aws cli 명령을 실행하는 것도 좋은 옵션일 수 있습니다.
import subprocess
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def run_command(command):
command_list = command.split(' ')
try:
logger.info("Running shell command: \"{}\"".format(command))
result = subprocess.run(command_list, stdout=subprocess.PIPE);
logger.info("Command output:\n---\n{}\n---".format(result.stdout.decode('UTF-8')))
except Exception as e:
logger.error("Exception: {}".format(e))
return False
return True
def lambda_handler(event, context):
run_command('/opt/aws s3 ls s3://bucket-name')
하위 폴더 아래에 있는 파일 수를 알고 싶었을 뿐이지만 하위 폴더 자체인 컨텐츠에 있는 파일 하나를 추가로 반환했기 때문에 밤새도록 이 작업에 매달렸습니다.
조사 결과, s3는 이렇게 동작하는 것을 알 수 있었습니다만, 다음의 디렉토리에서 redshift로부터 데이터를 언로드하는 시나리오가 있었습니다.
s3://bucket_name/subfolder/<10 number of files>
사용했을 때
paginator.paginate(Bucket=price_signal_bucket_name,Prefix=new_files_folder_path+"/")
10개의 파일만 반환되지만 s3 버킷 자체에 폴더를 생성하면 하위 폴더도 반환됩니다.
결론
- 폴더 전체가 s3에 업로드된 경우 를 나열하면 접두사 아래에 있는 파일만 반환됩니다.
- 그러나 fodler가 s3 버킷 자체에 생성된 경우 boto3 클라이언트를 사용하여 나열하면 하위 폴더와 파일도 반환됩니다.
언급URL : https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3
'source' 카테고리의 다른 글
파일에 여러 행을 쓰려면 문자열에 새 행을 지정하려면 어떻게 해야 합니까? (0) | 2023.01.22 |
---|---|
mysql은 내부 또는 외부 명령어, 프로그램 또는 배치로 인식되지 않습니다. (0) | 2023.01.22 |
SimpleXML 개체를 배열로 변환하는 방법 (0) | 2023.01.22 |
Python: '사전'이 비어 있는지 확인하는 것이 작동하지 않는 것 같습니다. (0) | 2023.01.22 |
버튼을 클릭해서 PHP 함수를 호출하는 방법 (0) | 2023.01.22 |