1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
|
import os import random import string import oss2
access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', '<你的AccessKeyId>') access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', '<你的AccessKeySecret>') bucket_name = os.getenv('OSS_TEST_BUCKET', '<你的Bucket>') endpoint = os.getenv('OSS_TEST_ENDPOINT', '<你的访问域名>')
for param in (access_key_id, access_key_secret, bucket_name, endpoint): assert '<' not in param, '请设置参数:' + param
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name)
def random_string(n): return ''.join(random.choice(string.ascii_lowercase) for i in range(n))
filename = random_string(32) + '.txt' content = oss2.to_bytes(random_string(1024 * 1024))
with open(filename, 'wb') as fileobj: fileobj.write(content)
""" 断点续传上传 """
oss2.resumable_upload(bucket, 'remote-normal.txt', filename)
oss2.resumable_upload(bucket, 'remote-multipart.txt', filename, multipart_threshold=100 * 1024)
""" 分片上传 """
total_size = os.path.getsize(filename) part_size = oss2.determine_part_size(total_size, preferred_size=128 * 1024)
key = 'remote-multipart2.txt' upload_id = bucket.init_multipart_upload(key).upload_id
with open(filename, 'rb') as fileobj: parts = [] part_number = 1 offset = 0 while offset < total_size: size_to_upload = min(part_size, total_size - offset) result = bucket.upload_part(key, upload_id, part_number, oss2.SizedFileAdapter(fileobj, size_to_upload)) parts.append(oss2.models.PartInfo(part_number, result.etag, size = size_to_upload, part_crc = result.crc))
offset += size_to_upload part_number += 1
bucket.complete_multipart_upload(key, upload_id, parts)
with open(filename, 'rb') as fileobj: assert bucket.get_object(key).read() == fileobj.read()
os.remove(filename)
|