This is an automated email from the ASF dual-hosted git repository. jinglun pushed a commit to branch HADOOP-19236 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit f6eab32fbfa50a7c555bda4ef1354ea1ab4e6022 Author: lijinglun <lijing...@bytedance.com> AuthorDate: Wed Aug 21 19:54:50 2024 +0800 Integration of TOS: Add TOS. --- .../org/apache/hadoop/fs/tosfs/common/Bytes.java | 62 + .../hadoop/fs/tosfs/object/PrefixStorage.java | 2 +- .../fs/tosfs/object/tos/ChainTOSInputStream.java | 137 +++ .../fs/tosfs/object/tos/DelegationClient.java | 1231 ++++++++++++++++++++ .../tosfs/object/tos/DelegationClientBuilder.java | 184 +++ .../fs/tosfs/object/tos/GetObjectOutput.java | 61 + .../org/apache/hadoop/fs/tosfs/object/tos/TOS.java | 1014 +++++++++++++++- .../hadoop/fs/tosfs/object/tos/TOSErrorCodes.java | 57 + .../hadoop/fs/tosfs/object/tos/TOSInputStream.java | 119 ++ .../hadoop/fs/tosfs/object/tos/TOSUtils.java | 117 ++ .../hadoop/fs/tosfs/object/tos/TosObjectInfo.java | 79 ++ 11 files changed, 3061 insertions(+), 2 deletions(-) diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java new file mode 100644 index 00000000000..5b8d8108640 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java @@ -0,0 +1,62 @@ +/* + * ByteDance Volcengine EMR, Copyright 2022. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.common; + +// TODO: Remove this class? +public class Bytes { + private Bytes() { + } + + public static final byte[] EMPTY_BYTES = new byte[0]; + + // Encode basic Java types into big-endian binaries. + + public static byte[] toBytes(boolean b) { + return new byte[]{b ? (byte) -1 : (byte) 0}; + } + + public static byte[] toBytes(byte b) { + return new byte[]{b}; + } + + public static byte[] toBytes(short val) { + byte[] b = new byte[2]; + for (int i = 1; i >= 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + return b; + } + + public static byte[] toBytes(int val) { + byte[] b = new byte[4]; + for (int i = 3; i >= 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + return b; + } + + public static byte[] toBytes(long val) { + byte[] b = new byte[8]; + for (int i = 7; i >= 0; i--) { + b[i] = (byte) val; + val >>>= 8; + } + return b; + } +} diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java index e0cb1db3d75..ad020b7ea42 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java @@ -18,12 +18,12 @@ package org.apache.hadoop.fs.tosfs.object; -import com.google.common.collect.Iterables; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest; import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse; import org.apache.hadoop.thirdparty.com.google.common.base.Strings; +import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables; import org.apache.hadoop.util.Preconditions; import java.io.IOException; diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/ChainTOSInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/ChainTOSInputStream.java new file mode 100644 index 00000000000..be40fc6f25f --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/ChainTOSInputStream.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import org.apache.hadoop.fs.tosfs.common.Chain; +import org.apache.hadoop.util.Preconditions; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ChainTOSInputStream extends InputStream { + private final Chain<TOSInputStream> chain; + private final TOS.GetObjectFactory factory; + private final String key; + private long curOff; + private final long endOff; // range end offset (inclusive) + private final long maxDrainByteSize; + private final int maxInputStreamRetries; + + private int readBytes; + private long skipped; + private byte[] objChecksum = null; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public ChainTOSInputStream( + TOS.GetObjectFactory factory, + String key, + long startOff, + long endOff, + long maxDrainByteSize, + int maxInputStreamRetries) { + this.factory = factory; + this.key = key; + this.curOff = startOff; + this.endOff = endOff; + this.maxDrainByteSize = maxDrainByteSize; + this.maxInputStreamRetries = maxInputStreamRetries; + this.chain = createChain(); + Preconditions.checkNotNull(objChecksum, "Checksum should not be null."); + } + + private Chain<TOSInputStream> createChain() { + Chain.Builder<TOSInputStream> builder = Chain.<TOSInputStream>builder() + .shouldContinue(e -> !(e instanceof EOFException)); + + for (int i = 0; i <= maxInputStreamRetries; i++) { + builder.addLast(() -> { + GetObjectOutput output = factory.create(key, curOff, endOff); + + // Note: If there are some IO errors occur, the ChainTOSInputStream will create a new stream in the chain to + // continue reading object data, we need to record the checksum during first open object stream, and ensure the + // checksum of object stream won't be changed if opening object many times within the lifecycle of the chained + // stream in case the underlying object is changed. + if (objChecksum == null) { + // Init the stream checksum. + objChecksum = output.checksum(); + } + return new TOSInputStream(output, curOff, endOff, maxDrainByteSize, objChecksum); + }); + } + + try { + return builder.build(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public long skip(long n) throws IOException { + skipped = 0; + return chain.run(stream -> { + long skip = stream.skip(n - skipped); + + curOff += skip; + skipped += skip; + return skipped; + }); + } + + @Override + public int read() throws IOException { + return chain.run(stream -> { + int ret = stream.read(); + curOff++; + return ret; + }); + } + + @Override + public int available() throws IOException { + return chain.run(InputStream::available); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + readBytes = 0; + return chain.run(in -> { + int read = in.read(b, off + readBytes, len - readBytes); + + readBytes += read; + curOff += read; + return readBytes; + }); + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + chain.close(); + } + } + + public byte[] checksum() { + return objChecksum; + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClient.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClient.java new file mode 100644 index 00000000000..884e98535da --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClient.java @@ -0,0 +1,1231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.TOSClientConfiguration; +import com.volcengine.tos.TOSV2ClientBuilder; +import com.volcengine.tos.TosClientException; +import com.volcengine.tos.TosException; +import com.volcengine.tos.TosServerException; +import com.volcengine.tos.auth.Credential; +import com.volcengine.tos.auth.Credentials; +import com.volcengine.tos.TOSV2; +import com.volcengine.tos.comm.HttpStatus; +import com.volcengine.tos.comm.common.ACLType; +import com.volcengine.tos.internal.RequestOptionsBuilder; +import com.volcengine.tos.model.acl.GetObjectAclOutput; +import com.volcengine.tos.model.acl.PutObjectAclInput; +import com.volcengine.tos.model.acl.PutObjectAclOutput; +import com.volcengine.tos.model.bucket.CreateBucketInput; +import com.volcengine.tos.model.bucket.CreateBucketOutput; +import com.volcengine.tos.model.bucket.CreateBucketV2Input; +import com.volcengine.tos.model.bucket.CreateBucketV2Output; +import com.volcengine.tos.model.bucket.DeleteBucketCORSInput; +import com.volcengine.tos.model.bucket.DeleteBucketCORSOutput; +import com.volcengine.tos.model.bucket.DeleteBucketCustomDomainInput; +import com.volcengine.tos.model.bucket.DeleteBucketCustomDomainOutput; +import com.volcengine.tos.model.bucket.DeleteBucketEncryptionInput; +import com.volcengine.tos.model.bucket.DeleteBucketEncryptionOutput; +import com.volcengine.tos.model.bucket.DeleteBucketInput; +import com.volcengine.tos.model.bucket.DeleteBucketInventoryInput; +import com.volcengine.tos.model.bucket.DeleteBucketInventoryOutput; +import com.volcengine.tos.model.bucket.DeleteBucketLifecycleInput; +import com.volcengine.tos.model.bucket.DeleteBucketLifecycleOutput; +import com.volcengine.tos.model.bucket.DeleteBucketMirrorBackInput; +import com.volcengine.tos.model.bucket.DeleteBucketMirrorBackOutput; +import com.volcengine.tos.model.bucket.DeleteBucketOutput; +import com.volcengine.tos.model.bucket.DeleteBucketPolicyInput; +import com.volcengine.tos.model.bucket.DeleteBucketPolicyOutput; +import com.volcengine.tos.model.bucket.DeleteBucketRealTimeLogInput; +import com.volcengine.tos.model.bucket.DeleteBucketRealTimeLogOutput; +import com.volcengine.tos.model.bucket.DeleteBucketRenameInput; +import com.volcengine.tos.model.bucket.DeleteBucketRenameOutput; +import com.volcengine.tos.model.bucket.DeleteBucketReplicationInput; +import com.volcengine.tos.model.bucket.DeleteBucketReplicationOutput; +import com.volcengine.tos.model.bucket.DeleteBucketTaggingInput; +import com.volcengine.tos.model.bucket.DeleteBucketTaggingOutput; +import com.volcengine.tos.model.bucket.DeleteBucketWebsiteInput; +import com.volcengine.tos.model.bucket.DeleteBucketWebsiteOutput; +import com.volcengine.tos.model.bucket.GetBucketACLInput; +import com.volcengine.tos.model.bucket.GetBucketACLOutput; +import com.volcengine.tos.model.bucket.GetBucketCORSInput; +import com.volcengine.tos.model.bucket.GetBucketCORSOutput; +import com.volcengine.tos.model.bucket.GetBucketEncryptionInput; +import com.volcengine.tos.model.bucket.GetBucketEncryptionOutput; +import com.volcengine.tos.model.bucket.GetBucketInventoryInput; +import com.volcengine.tos.model.bucket.GetBucketInventoryOutput; +import com.volcengine.tos.model.bucket.GetBucketLifecycleInput; +import com.volcengine.tos.model.bucket.GetBucketLifecycleOutput; +import com.volcengine.tos.model.bucket.GetBucketLocationInput; +import com.volcengine.tos.model.bucket.GetBucketLocationOutput; +import com.volcengine.tos.model.bucket.GetBucketMirrorBackInput; +import com.volcengine.tos.model.bucket.GetBucketMirrorBackOutput; +import com.volcengine.tos.model.bucket.GetBucketNotificationInput; +import com.volcengine.tos.model.bucket.GetBucketNotificationOutput; +import com.volcengine.tos.model.bucket.GetBucketNotificationType2Input; +import com.volcengine.tos.model.bucket.GetBucketNotificationType2Output; +import com.volcengine.tos.model.bucket.GetBucketPolicyInput; +import com.volcengine.tos.model.bucket.GetBucketPolicyOutput; +import com.volcengine.tos.model.bucket.GetBucketRealTimeLogInput; +import com.volcengine.tos.model.bucket.GetBucketRealTimeLogOutput; +import com.volcengine.tos.model.bucket.GetBucketRenameInput; +import com.volcengine.tos.model.bucket.GetBucketRenameOutput; +import com.volcengine.tos.model.bucket.GetBucketReplicationInput; +import com.volcengine.tos.model.bucket.GetBucketReplicationOutput; +import com.volcengine.tos.model.bucket.GetBucketTaggingInput; +import com.volcengine.tos.model.bucket.GetBucketTaggingOutput; +import com.volcengine.tos.model.bucket.GetBucketVersioningInput; +import com.volcengine.tos.model.bucket.GetBucketVersioningOutput; +import com.volcengine.tos.model.bucket.GetBucketWebsiteInput; +import com.volcengine.tos.model.bucket.GetBucketWebsiteOutput; +import com.volcengine.tos.model.bucket.HeadBucketOutput; +import com.volcengine.tos.model.bucket.HeadBucketV2Input; +import com.volcengine.tos.model.bucket.HeadBucketV2Output; +import com.volcengine.tos.model.bucket.ListBucketCustomDomainInput; +import com.volcengine.tos.model.bucket.ListBucketCustomDomainOutput; +import com.volcengine.tos.model.bucket.ListBucketInventoryInput; +import com.volcengine.tos.model.bucket.ListBucketInventoryOutput; +import com.volcengine.tos.model.bucket.ListBucketsInput; +import com.volcengine.tos.model.bucket.ListBucketsOutput; +import com.volcengine.tos.model.bucket.ListBucketsV2Input; +import com.volcengine.tos.model.bucket.ListBucketsV2Output; +import com.volcengine.tos.model.bucket.PutBucketACLInput; +import com.volcengine.tos.model.bucket.PutBucketACLOutput; +import com.volcengine.tos.model.bucket.PutBucketCORSInput; +import com.volcengine.tos.model.bucket.PutBucketCORSOutput; +import com.volcengine.tos.model.bucket.PutBucketCustomDomainInput; +import com.volcengine.tos.model.bucket.PutBucketCustomDomainOutput; +import com.volcengine.tos.model.bucket.PutBucketEncryptionInput; +import com.volcengine.tos.model.bucket.PutBucketEncryptionOutput; +import com.volcengine.tos.model.bucket.PutBucketInventoryInput; +import com.volcengine.tos.model.bucket.PutBucketInventoryOutput; +import com.volcengine.tos.model.bucket.PutBucketLifecycleInput; +import com.volcengine.tos.model.bucket.PutBucketLifecycleOutput; +import com.volcengine.tos.model.bucket.PutBucketMirrorBackInput; +import com.volcengine.tos.model.bucket.PutBucketMirrorBackOutput; +import com.volcengine.tos.model.bucket.PutBucketNotificationInput; +import com.volcengine.tos.model.bucket.PutBucketNotificationOutput; +import com.volcengine.tos.model.bucket.PutBucketNotificationType2Input; +import com.volcengine.tos.model.bucket.PutBucketNotificationType2Output; +import com.volcengine.tos.model.bucket.PutBucketPolicyInput; +import com.volcengine.tos.model.bucket.PutBucketPolicyOutput; +import com.volcengine.tos.model.bucket.PutBucketRealTimeLogInput; +import com.volcengine.tos.model.bucket.PutBucketRealTimeLogOutput; +import com.volcengine.tos.model.bucket.PutBucketRenameInput; +import com.volcengine.tos.model.bucket.PutBucketRenameOutput; +import com.volcengine.tos.model.bucket.PutBucketReplicationInput; +import com.volcengine.tos.model.bucket.PutBucketReplicationOutput; +import com.volcengine.tos.model.bucket.PutBucketStorageClassInput; +import com.volcengine.tos.model.bucket.PutBucketStorageClassOutput; +import com.volcengine.tos.model.bucket.PutBucketTaggingInput; +import com.volcengine.tos.model.bucket.PutBucketTaggingOutput; +import com.volcengine.tos.model.bucket.PutBucketVersioningInput; +import com.volcengine.tos.model.bucket.PutBucketVersioningOutput; +import com.volcengine.tos.model.bucket.PutBucketWebsiteInput; +import com.volcengine.tos.model.bucket.PutBucketWebsiteOutput; +import com.volcengine.tos.model.object.AbortMultipartUploadInput; +import com.volcengine.tos.model.object.AbortMultipartUploadOutput; +import com.volcengine.tos.model.object.AppendObjectInput; +import com.volcengine.tos.model.object.AppendObjectOutput; +import com.volcengine.tos.model.object.CompleteMultipartUploadInput; +import com.volcengine.tos.model.object.CompleteMultipartUploadOutput; +import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input; +import com.volcengine.tos.model.object.CompleteMultipartUploadV2Output; +import com.volcengine.tos.model.object.CopyObjectOutput; +import com.volcengine.tos.model.object.CopyObjectV2Input; +import com.volcengine.tos.model.object.CopyObjectV2Output; +import com.volcengine.tos.model.object.CreateMultipartUploadInput; +import com.volcengine.tos.model.object.CreateMultipartUploadOutput; +import com.volcengine.tos.model.object.DeleteMultiObjectsInput; +import com.volcengine.tos.model.object.DeleteMultiObjectsOutput; +import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input; +import com.volcengine.tos.model.object.DeleteMultiObjectsV2Output; +import com.volcengine.tos.model.object.DeleteObjectInput; +import com.volcengine.tos.model.object.DeleteObjectOutput; +import com.volcengine.tos.model.object.DeleteObjectTaggingInput; +import com.volcengine.tos.model.object.DeleteObjectTaggingOutput; +import com.volcengine.tos.model.object.DownloadFileInput; +import com.volcengine.tos.model.object.DownloadFileOutput; +import com.volcengine.tos.model.object.FetchObjectInput; +import com.volcengine.tos.model.object.FetchObjectOutput; +import com.volcengine.tos.model.object.GetFetchTaskInput; +import com.volcengine.tos.model.object.GetFetchTaskOutput; +import com.volcengine.tos.model.object.GetFileStatusInput; +import com.volcengine.tos.model.object.GetFileStatusOutput; +import com.volcengine.tos.model.object.GetObjectACLV2Input; +import com.volcengine.tos.model.object.GetObjectACLV2Output; +import com.volcengine.tos.model.object.GetObjectOutput; +import com.volcengine.tos.model.object.GetObjectTaggingInput; +import com.volcengine.tos.model.object.GetObjectTaggingOutput; +import com.volcengine.tos.model.object.GetObjectToFileInput; +import com.volcengine.tos.model.object.GetObjectToFileOutput; +import com.volcengine.tos.model.object.GetObjectV2Input; +import com.volcengine.tos.model.object.GetObjectV2Output; +import com.volcengine.tos.model.object.GetSymlinkInput; +import com.volcengine.tos.model.object.GetSymlinkOutput; +import com.volcengine.tos.model.object.HeadObjectOutput; +import com.volcengine.tos.model.object.HeadObjectV2Input; +import com.volcengine.tos.model.object.HeadObjectV2Output; +import com.volcengine.tos.model.object.ListMultipartUploadsInput; +import com.volcengine.tos.model.object.ListMultipartUploadsOutput; +import com.volcengine.tos.model.object.ListMultipartUploadsV2Input; +import com.volcengine.tos.model.object.ListMultipartUploadsV2Output; +import com.volcengine.tos.model.object.ListObjectVersionsInput; +import com.volcengine.tos.model.object.ListObjectVersionsOutput; +import com.volcengine.tos.model.object.ListObjectVersionsV2Input; +import com.volcengine.tos.model.object.ListObjectVersionsV2Output; +import com.volcengine.tos.model.object.ListObjectsInput; +import com.volcengine.tos.model.object.ListObjectsOutput; +import com.volcengine.tos.model.object.ListObjectsType2Input; +import com.volcengine.tos.model.object.ListObjectsType2Output; +import com.volcengine.tos.model.object.ListObjectsV2Input; +import com.volcengine.tos.model.object.ListObjectsV2Output; +import com.volcengine.tos.model.object.ListPartsInput; +import com.volcengine.tos.model.object.ListPartsOutput; +import com.volcengine.tos.model.object.ListUploadedPartsInput; +import com.volcengine.tos.model.object.ListUploadedPartsOutput; +import com.volcengine.tos.model.object.ObjectMetaRequestOptions; +import com.volcengine.tos.model.object.PreSignedPolicyURLInput; +import com.volcengine.tos.model.object.PreSignedPolicyURLOutput; +import com.volcengine.tos.model.object.PreSignedPostSignatureInput; +import com.volcengine.tos.model.object.PreSignedPostSignatureOutput; +import com.volcengine.tos.model.object.PreSignedURLInput; +import com.volcengine.tos.model.object.PreSignedURLOutput; +import com.volcengine.tos.model.object.PreSingedPolicyURLInput; +import com.volcengine.tos.model.object.PreSingedPolicyURLOutput; +import com.volcengine.tos.model.object.PutFetchTaskInput; +import com.volcengine.tos.model.object.PutFetchTaskOutput; +import com.volcengine.tos.model.object.PutObjectACLInput; +import com.volcengine.tos.model.object.PutObjectACLOutput; +import com.volcengine.tos.model.object.PutObjectFromFileInput; +import com.volcengine.tos.model.object.PutObjectFromFileOutput; +import com.volcengine.tos.model.object.PutObjectInput; +import com.volcengine.tos.model.object.PutObjectOutput; +import com.volcengine.tos.model.object.PutObjectTaggingInput; +import com.volcengine.tos.model.object.PutObjectTaggingOutput; +import com.volcengine.tos.model.object.PutSymlinkInput; +import com.volcengine.tos.model.object.PutSymlinkOutput; +import com.volcengine.tos.model.object.RenameObjectInput; +import com.volcengine.tos.model.object.RenameObjectOutput; +import com.volcengine.tos.model.object.RestoreObjectInput; +import com.volcengine.tos.model.object.RestoreObjectOutput; +import com.volcengine.tos.model.object.ResumableCopyObjectInput; +import com.volcengine.tos.model.object.ResumableCopyObjectOutput; +import com.volcengine.tos.model.object.SetObjectMetaInput; +import com.volcengine.tos.model.object.SetObjectMetaOutput; +import com.volcengine.tos.model.object.UploadFileInput; +import com.volcengine.tos.model.object.UploadFileOutput; +import com.volcengine.tos.model.object.UploadFileV2Input; +import com.volcengine.tos.model.object.UploadFileV2Output; +import com.volcengine.tos.model.object.UploadPartCopyInput; +import com.volcengine.tos.model.object.UploadPartCopyOutput; +import com.volcengine.tos.model.object.UploadPartCopyV2Input; +import com.volcengine.tos.model.object.UploadPartCopyV2Output; +import com.volcengine.tos.model.object.UploadPartFromFileInput; +import com.volcengine.tos.model.object.UploadPartFromFileOutput; +import com.volcengine.tos.model.object.UploadPartInput; +import com.volcengine.tos.model.object.UploadPartOutput; +import com.volcengine.tos.model.object.UploadPartV2Input; +import com.volcengine.tos.model.object.UploadPartV2Output; +import com.volcengine.tos.transport.TransportConfig; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.tosfs.object.InputStreamProvider; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.util.RetryableUtils; +import org.apache.hadoop.thirdparty.com.google.common.base.Throwables; +import org.apache.hadoop.thirdparty.com.google.common.io.CountingInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.net.UnknownHostException; +import java.time.Duration; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.Callable; +import javax.net.ssl.SSLException; + +public class DelegationClient implements TOSV2 { + + private static final Logger LOG = LoggerFactory.getLogger(DelegationClient.class); + + private final Credentials provider; + private final TOSClientConfiguration config; + private int maxRetryTimes; + private TOSV2 client; + private volatile Credential usedCredential; + private final List<String> nonRetryable409ErrorCodes; + + protected DelegationClient( + TOSClientConfiguration configuration, int maxRetryTimes, List<String> nonRetryable409ErrorCodes) { + this.config = configuration; + this.maxRetryTimes = maxRetryTimes; + this.provider = configuration.getCredentials(); + this.usedCredential = provider.credential(); + this.client = new TOSV2ClientBuilder().build(configuration); + this.nonRetryable409ErrorCodes = nonRetryable409ErrorCodes; + } + + @VisibleForTesting + void setClient(TOSV2 client) { + this.client = client; + } + + public TOSV2 client() { + return client; + } + + @VisibleForTesting + void setMaxRetryTimes(int maxRetryTimes) { + this.maxRetryTimes = maxRetryTimes; + } + + public int maxRetryTimes() { + return maxRetryTimes; + } + + public TOSClientConfiguration config() { + return config; + } + + public Credential usedCredential() { + return usedCredential; + } + + @Override + public CreateBucketV2Output createBucket(String bucket) throws TosException { + return retry(() -> client.createBucket(bucket)); + } + + @Override + public CreateBucketV2Output createBucket(CreateBucketV2Input input) throws TosException { + return retry(() -> client.createBucket(input)); + } + + @Override + public HeadBucketV2Output headBucket(HeadBucketV2Input input) throws TosException { + return retry(() -> client.headBucket(input)); + } + + @Override + public DeleteBucketOutput deleteBucket(DeleteBucketInput input) throws TosException { + return retry(() -> client.deleteBucket(input)); + } + + @Override + public ListBucketsV2Output listBuckets(ListBucketsV2Input input) throws TosException { + return retry(() -> client.listBuckets(input)); + } + + @Override + public CreateBucketOutput createBucket(CreateBucketInput input) throws TosException { + return retry(() -> client.createBucket(input)); + } + + @Override + public HeadBucketOutput headBucket(String bucket) throws TosException { + return retry(() -> client.headBucket(bucket)); + } + + @Override + public DeleteBucketOutput deleteBucket(String bucket) throws TosException { + return retry(() -> client.deleteBucket(bucket)); + } + + @Override + public ListBucketsOutput listBuckets(ListBucketsInput input) throws TosException { + return retry(() -> client.listBuckets(input)); + } + + @Override + public PutBucketPolicyOutput putBucketPolicy(String bucket, String policy) throws TosException { + return retry(() -> client.putBucketPolicy(bucket, policy)); + } + + @Override + public PutBucketPolicyOutput putBucketPolicy(PutBucketPolicyInput input) throws TosException { + return retry(() -> client.putBucketPolicy(input)); + } + + @Override + public GetBucketPolicyOutput getBucketPolicy(String bucket) throws TosException { + return retry(() -> client.getBucketPolicy(bucket)); + } + + @Override + public GetBucketPolicyOutput getBucketPolicy(GetBucketPolicyInput input) throws TosException { + return retry(() -> client.getBucketPolicy(input)); + } + + @Override + public DeleteBucketPolicyOutput deleteBucketPolicy(String bucket) throws TosException { + return retry(() -> client.deleteBucketPolicy(bucket)); + } + + @Override + public GetObjectOutput getObject(String bucket, String objectKey, + RequestOptionsBuilder... builders) throws TosException { + return retry(() -> client.getObject(bucket, objectKey, builders)); + } + + @Override + public HeadObjectOutput headObject(String bucket, String objectKey, + RequestOptionsBuilder... builders) throws TosException { + return retry(() -> client.headObject(bucket, objectKey, builders)); + } + + @Override + public DeleteObjectOutput deleteObject(String bucket, String objectKey, + RequestOptionsBuilder... builders) throws TosException { + return retry(() -> client.deleteObject(bucket, objectKey, builders)); + } + + @Override + public DeleteMultiObjectsOutput deleteMultiObjects( + String bucket, + DeleteMultiObjectsInput input, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.deleteMultiObjects(bucket, input, builders)); + } + + @Override + public PutObjectOutput putObject( + String bucket, String objectKey, InputStream inputStream, + RequestOptionsBuilder... builders) + throws TosException { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public UploadFileOutput uploadFile( + String bucket, UploadFileInput input, + RequestOptionsBuilder... builders) throws TosException { + return retry(() -> client.uploadFile(bucket, input, builders)); + } + + @Override + public AppendObjectOutput appendObject( + String bucket, String objectKey, InputStream content, long offset, + RequestOptionsBuilder... builders) + throws TosException { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public SetObjectMetaOutput setObjectMeta(String bucket, String objectKey, RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.setObjectMeta(bucket, objectKey, builders)); + } + + @Override + public ListObjectsOutput listObjects(String bucket, ListObjectsInput input) throws TosException { + return retry(() -> client.listObjects(bucket, input)); + } + + @Override + public ListObjectVersionsOutput listObjectVersions(String bucket, ListObjectVersionsInput input) throws TosException { + return retry(() -> client.listObjectVersions(bucket, input)); + } + + @Override + public CopyObjectOutput copyObject( + String bucket, String srcObjectKey, String dstObjectKey, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.copyObject(bucket, srcObjectKey, dstObjectKey, builders)); + } + + @Override + public CopyObjectOutput copyObjectTo( + String bucket, String dstBucket, String dstObjectKey, + String srcObjectKey, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.copyObjectTo(bucket, dstBucket, dstObjectKey, srcObjectKey, builders)); + } + + @Override + public CopyObjectOutput copyObjectFrom( + String bucket, String srcBucket, String srcObjectKey, String dstObjectKey, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.copyObjectFrom(bucket, srcBucket, srcObjectKey, dstObjectKey, builders)); + } + + @Override + public UploadPartCopyOutput uploadPartCopy( + String bucket, UploadPartCopyInput input, + RequestOptionsBuilder... builders) throws TosException { + return retry(() -> client.uploadPartCopy(bucket, input, builders)); + } + + @Override + public PutObjectAclOutput putObjectAcl(String bucket, PutObjectAclInput input) throws TosException { + return retry(() -> client.putObjectAcl(bucket, input)); + } + + @Override + public GetObjectAclOutput getObjectAcl( + String bucket, String objectKey, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.getObjectAcl(bucket, objectKey, builders)); + } + + @Override + public CreateMultipartUploadOutput createMultipartUpload( + String bucket, String objectKey, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.createMultipartUpload(bucket, objectKey, builders)); + } + + @Override + public UploadPartOutput uploadPart( + String bucket, UploadPartInput input, + RequestOptionsBuilder... builders) + throws TosException { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public CompleteMultipartUploadOutput completeMultipartUpload( + String bucket, + CompleteMultipartUploadInput input) + throws TosException { + return retry(() -> client.completeMultipartUpload(bucket, input)); + } + + @Override + public AbortMultipartUploadOutput abortMultipartUpload( + String bucket, + AbortMultipartUploadInput input) + throws TosException { + return retry(() -> client.abortMultipartUpload(bucket, input)); + } + + @Override + public ListUploadedPartsOutput listUploadedParts( + String bucket, + ListUploadedPartsInput input, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.listUploadedParts(bucket, input, builders)); + } + + @Override + public ListMultipartUploadsOutput listMultipartUploads( + String bucket, + ListMultipartUploadsInput input) + throws TosException { + return retry(() -> client.listMultipartUploads(bucket, input)); + } + + @Override + public String preSignedURL( + String httpMethod, String bucket, String objectKey, Duration ttl, + RequestOptionsBuilder... builders) + throws TosException { + return retry(() -> client.preSignedURL(httpMethod, bucket, objectKey, ttl, builders)); + } + + @Override + public DeleteBucketPolicyOutput deleteBucketPolicy(DeleteBucketPolicyInput input) + throws TosException { + return retry(() -> client.deleteBucketPolicy(input)); + } + + @Override + public PutBucketCORSOutput putBucketCORS(PutBucketCORSInput input) + throws TosException { + return retry(() -> client.putBucketCORS(input)); + } + + @Override + public GetBucketCORSOutput getBucketCORS(GetBucketCORSInput input) + throws TosException { + return retry(() -> client.getBucketCORS(input)); + } + + @Override + public DeleteBucketCORSOutput deleteBucketCORS(DeleteBucketCORSInput input) + throws TosException { + return retry(() -> client.deleteBucketCORS(input)); + } + + @Override + public PutBucketStorageClassOutput putBucketStorageClass(PutBucketStorageClassInput input) + throws TosException { + return retry(() -> client.putBucketStorageClass(input)); + } + + @Override + public GetBucketLocationOutput getBucketLocation(GetBucketLocationInput input) + throws TosException { + return retry(() -> client.getBucketLocation(input)); + } + + @Override + public PutBucketLifecycleOutput putBucketLifecycle(PutBucketLifecycleInput input) + throws TosException { + return retry(() -> client.putBucketLifecycle(input)); + } + + @Override + public GetBucketLifecycleOutput getBucketLifecycle(GetBucketLifecycleInput input) + throws TosException { + return retry(() -> client.getBucketLifecycle(input)); + } + + @Override + public DeleteBucketLifecycleOutput deleteBucketLifecycle(DeleteBucketLifecycleInput input) + throws TosException { + return retry(() -> client.deleteBucketLifecycle(input)); + } + + @Override + public PutBucketMirrorBackOutput putBucketMirrorBack(PutBucketMirrorBackInput input) + throws TosException { + return retry(() -> client.putBucketMirrorBack(input)); + } + + @Override + public GetBucketMirrorBackOutput getBucketMirrorBack(GetBucketMirrorBackInput input) + throws TosException { + return retry(() -> client.getBucketMirrorBack(input)); + } + + @Override + public DeleteBucketMirrorBackOutput deleteBucketMirrorBack(DeleteBucketMirrorBackInput input) + throws TosException { + return retry(() -> client.deleteBucketMirrorBack(input)); + } + + @Override + public PutBucketReplicationOutput putBucketReplication(PutBucketReplicationInput input) + throws TosException { + return retry(() -> client.putBucketReplication(input)); + } + + @Override + public GetBucketReplicationOutput getBucketReplication(GetBucketReplicationInput input) + throws TosException { + return retry(() -> client.getBucketReplication(input)); + } + + @Override + public DeleteBucketReplicationOutput deleteBucketReplication(DeleteBucketReplicationInput input) + throws TosException { + return retry(() -> client.deleteBucketReplication(input)); + } + + @Override + public PutBucketVersioningOutput putBucketVersioning(PutBucketVersioningInput input) + throws TosException { + return retry(() -> client.putBucketVersioning(input)); + } + + @Override + public GetBucketVersioningOutput getBucketVersioning(GetBucketVersioningInput input) + throws TosException { + return retry(() -> client.getBucketVersioning(input)); + } + + @Override + public PutBucketWebsiteOutput putBucketWebsite(PutBucketWebsiteInput input) + throws TosException { + return retry(() -> client.putBucketWebsite(input)); + } + + @Override + public GetBucketWebsiteOutput getBucketWebsite(GetBucketWebsiteInput input) + throws TosException { + return retry(() -> client.getBucketWebsite(input)); + } + + @Override + public DeleteBucketWebsiteOutput deleteBucketWebsite(DeleteBucketWebsiteInput input) + throws TosException { + return retry(() -> client.deleteBucketWebsite(input)); + } + + @Override + public PutBucketNotificationOutput putBucketNotification(PutBucketNotificationInput input) + throws TosException { + return retry(() -> client.putBucketNotification(input)); + } + + @Override + public GetBucketNotificationOutput getBucketNotification(GetBucketNotificationInput input) + throws TosException { + return retry(() -> client.getBucketNotification(input)); + } + + @Override + public PutBucketNotificationType2Output putBucketNotificationType2(PutBucketNotificationType2Input input) + throws TosException { + return retry(() -> client.putBucketNotificationType2(input)); + } + + @Override + public GetBucketNotificationType2Output getBucketNotificationType2(GetBucketNotificationType2Input input) + throws TosException { + return retry(() -> client.getBucketNotificationType2(input)); + } + + @Override + public PutBucketCustomDomainOutput putBucketCustomDomain(PutBucketCustomDomainInput input) + throws TosException { + return retry(() -> client.putBucketCustomDomain(input)); + } + + @Override + public ListBucketCustomDomainOutput listBucketCustomDomain(ListBucketCustomDomainInput input) + throws TosException { + return retry(() -> client.listBucketCustomDomain(input)); + } + + @Override + public DeleteBucketCustomDomainOutput deleteBucketCustomDomain(DeleteBucketCustomDomainInput input) + throws TosException { + return retry(() -> client.deleteBucketCustomDomain(input)); + } + + @Override + public PutBucketRealTimeLogOutput putBucketRealTimeLog(PutBucketRealTimeLogInput input) + throws TosException { + return retry(() -> client.putBucketRealTimeLog(input)); + } + + @Override + public GetBucketRealTimeLogOutput getBucketRealTimeLog(GetBucketRealTimeLogInput input) + throws TosException { + return retry(() -> client.getBucketRealTimeLog(input)); + } + + @Override + public DeleteBucketRealTimeLogOutput deleteBucketRealTimeLog(DeleteBucketRealTimeLogInput input) + throws TosException { + return retry(() -> deleteBucketRealTimeLog(input)); + } + + @Override + public PutBucketACLOutput putBucketACL(PutBucketACLInput input) throws TosException { + return retry(() -> client.putBucketACL(input)); + } + + @Override + public GetBucketACLOutput getBucketACL(GetBucketACLInput input) throws TosException { + return retry(() -> client.getBucketACL(input)); + } + + @Override + public PutBucketRenameOutput putBucketRename(PutBucketRenameInput input) throws TosException { + return retry(() -> client.putBucketRename(input)); + } + + @Override + public GetBucketRenameOutput getBucketRename(GetBucketRenameInput input) throws TosException { + return retry(() -> client.getBucketRename(input)); + } + + @Override + public DeleteBucketRenameOutput deleteBucketRename(DeleteBucketRenameInput input) throws TosException { + return retry(() -> client.deleteBucketRename(input)); + } + + @Override + public PutBucketEncryptionOutput putBucketEncryption(PutBucketEncryptionInput input) throws TosException { + return retry(() -> client.putBucketEncryption(input)); + } + + @Override + public GetBucketEncryptionOutput getBucketEncryption(GetBucketEncryptionInput input) throws TosException { + return retry(() -> client.getBucketEncryption(input)); + } + + @Override + public DeleteBucketEncryptionOutput deleteBucketEncryption(DeleteBucketEncryptionInput input) throws TosException { + return retry(() -> client.deleteBucketEncryption(input)); + } + + @Override + public PutBucketTaggingOutput putBucketTagging(PutBucketTaggingInput input) throws TosException { + return retry(() -> client.putBucketTagging(input)); + } + + @Override + public GetBucketTaggingOutput getBucketTagging(GetBucketTaggingInput input) throws TosException { + return retry(() -> client.getBucketTagging(input)); + } + + @Override + public DeleteBucketTaggingOutput deleteBucketTagging(DeleteBucketTaggingInput input) throws TosException { + return retry(() -> client.deleteBucketTagging(input)); + } + + @Override + public PutBucketInventoryOutput putBucketInventory(PutBucketInventoryInput input) + throws TosException { + return retry(() -> client.putBucketInventory(input)); + } + + @Override + public GetBucketInventoryOutput getBucketInventory(GetBucketInventoryInput input) throws TosException { + return retry(() -> client.getBucketInventory(input)); + } + + @Override + public ListBucketInventoryOutput listBucketInventory(ListBucketInventoryInput input) throws TosException { + return retry(() -> client.listBucketInventory(input)); + } + + @Override + public DeleteBucketInventoryOutput deleteBucketInventory(DeleteBucketInventoryInput input) throws TosException { + return retry(() -> client.deleteBucketInventory(input)); + } + + @Override + public GetObjectV2Output getObject(GetObjectV2Input input) throws TosException { + return retry(() -> client.getObject(input)); + } + + @Override + public GetObjectToFileOutput getObjectToFile(GetObjectToFileInput input) throws TosException { + return retry(() -> client.getObjectToFile(input)); + } + + @Override + public GetFileStatusOutput getFileStatus(GetFileStatusInput input) throws TosException { + return retry(() -> client.getFileStatus(input)); + } + + @Override + public UploadFileV2Output uploadFile(UploadFileV2Input input) throws TosException { + return retry(() -> client.uploadFile(input)); + } + + @Override + public DownloadFileOutput downloadFile(DownloadFileInput input) throws TosException { + return retry(() -> client.downloadFile(input)); + } + + @Override + public ResumableCopyObjectOutput resumableCopyObject(ResumableCopyObjectInput input) + throws TosException { + return retry(() -> client.resumableCopyObject(input)); + } + + @Override + public HeadObjectV2Output headObject(HeadObjectV2Input input) throws TosException { + return retry(() -> client.headObject(input)); + } + + @Override + public DeleteObjectOutput deleteObject(DeleteObjectInput input) throws TosException { + return retry(() -> client.deleteObject(input)); + } + + @Override + public DeleteMultiObjectsV2Output deleteMultiObjects(DeleteMultiObjectsV2Input input) + throws TosException { + return retry(() -> client.deleteMultiObjects(input)); + } + + public PutObjectOutput put( + String bucket, String key, InputStreamProvider streamProvider, + long contentLength, ACLType aclType) { + return retry(() -> client.putObject(newPutObjectRequest(bucket, key, streamProvider, contentLength, aclType))); + } + + private PutObjectInput newPutObjectRequest( + String bucket, + String key, + InputStreamProvider streamProvider, + long contentLength, + ACLType aclType) { + + return PutObjectInput.builder() + .bucket(bucket) + .key(key) + .content(streamProvider.newStream()) + .contentLength(contentLength) + .options(new ObjectMetaRequestOptions() + .setAclType(aclType)) + .build(); + } + + public AppendObjectOutput appendObject(String bucket, String key, InputStreamProvider streamProvider, + long offset, long contentLength, String originalCrc64, ACLType aclType) { + // originalCrc64 is needed when appending data to object. It should be the object's crc64 checksum if the object + // exists, and null if the object doesn't exist. + return retry(() -> client.appendObject( + newAppendObjectRequest(bucket, key, streamProvider, offset, contentLength, originalCrc64, aclType))); + } + + private AppendObjectInput newAppendObjectRequest( + String bucket, + String key, + InputStreamProvider streamProvider, + long offset, + long contentLength, + String preCrc64ecma, + ACLType aclType) { + return AppendObjectInput.builder() + .bucket(bucket) + .key(key) + .content(streamProvider.newStream()) + .offset(offset) + .contentLength(contentLength) + .preHashCrc64ecma(preCrc64ecma) + .options(new ObjectMetaRequestOptions() + .setAclType(aclType)) + .build(); + } + + @Override + public PutObjectOutput putObject(PutObjectInput input) throws TosException { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public PutObjectFromFileOutput putObjectFromFile(PutObjectFromFileInput input) + throws TosException { + return retry(() -> client.putObjectFromFile(input)); + } + + @Override + public AppendObjectOutput appendObject(AppendObjectInput input) + throws TosException { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public SetObjectMetaOutput setObjectMeta(SetObjectMetaInput input) + throws TosException { + return retry(() -> client.setObjectMeta(input)); + } + + @Override + public ListObjectsV2Output listObjects(ListObjectsV2Input input) + throws TosException { + return retry(() -> client.listObjects(input)); + } + + @Override + public ListObjectsType2Output listObjectsType2(ListObjectsType2Input input) + throws TosException { + return retry(() -> client.listObjectsType2(input)); + } + + @Override + public ListObjectVersionsV2Output listObjectVersions(ListObjectVersionsV2Input input) + throws TosException { + return retry(() -> client.listObjectVersions(input)); + } + + @Override + public CopyObjectV2Output copyObject(CopyObjectV2Input input) + throws TosException { + return retry(() -> client.copyObject(input)); + } + + @Override + public UploadPartCopyV2Output uploadPartCopy(UploadPartCopyV2Input input) + throws TosException { + return retry(() -> client.uploadPartCopy(input)); + } + + @Override + public PutObjectACLOutput putObjectAcl(PutObjectACLInput input) + throws TosException { + return retry(() -> client.putObjectAcl(input)); + } + + @Override + public GetObjectACLV2Output getObjectAcl(GetObjectACLV2Input input) + throws TosException { + return retry(() -> client.getObjectAcl(input)); + } + + @Override + public PutObjectTaggingOutput putObjectTagging(PutObjectTaggingInput input) + throws TosException { + return retry(() -> client.putObjectTagging(input)); + } + + @Override + public GetObjectTaggingOutput getObjectTagging(GetObjectTaggingInput input) + throws TosException { + return retry(() -> client.getObjectTagging(input)); + } + + @Override + public DeleteObjectTaggingOutput deleteObjectTagging(DeleteObjectTaggingInput input) + throws TosException { + return retry(() -> client.deleteObjectTagging(input)); + } + + @Override + public FetchObjectOutput fetchObject(FetchObjectInput input) throws TosException { + return retry(() -> client.fetchObject(input)); + } + + @Override + public PutFetchTaskOutput putFetchTask(PutFetchTaskInput input) throws TosException { + return retry(() -> client.putFetchTask(input)); + } + + @Override + public GetFetchTaskOutput getFetchTask(GetFetchTaskInput input) throws TosException { + return retry(() -> client.getFetchTask(input)); + } + + @Override + public CreateMultipartUploadOutput createMultipartUpload(CreateMultipartUploadInput input) + throws TosException { + return retry(() -> client.createMultipartUpload(input)); + } + + public Part uploadPart( + String bucket, + String key, + String uploadId, + int partNum, + InputStreamProvider streamProvider, + long contentLength, + ACLType aclType) { + return retry(() -> { + InputStream in = streamProvider.newStream(); + CountingInputStream countedIn = new CountingInputStream(in); + UploadPartV2Input request = UploadPartV2Input.builder() + .bucket(bucket) + .key(key) + .partNumber(partNum) + .uploadID(uploadId) + .content(countedIn) + .contentLength(contentLength) + .options(new ObjectMetaRequestOptions() + .setAclType(aclType)) + .build(); + UploadPartV2Output output = client.uploadPart(request); + return new Part(output.getPartNumber(), countedIn.getCount(), output.getEtag()); + }); + } + + @Override + public UploadPartV2Output uploadPart(UploadPartV2Input input) throws TosException { + throw new UnsupportedOperationException("Not supported"); + } + + @Override + public UploadPartFromFileOutput uploadPartFromFile(UploadPartFromFileInput input) + throws TosException { + return retry(() -> client.uploadPartFromFile(input)); + } + + @Override + public CompleteMultipartUploadV2Output completeMultipartUpload(CompleteMultipartUploadV2Input input) + throws TosException { + return retry(() -> client.completeMultipartUpload(input)); + } + + @Override + public AbortMultipartUploadOutput abortMultipartUpload(AbortMultipartUploadInput input) + throws TosException { + return retry(() -> client.abortMultipartUpload(input)); + } + + @Override + public ListPartsOutput listParts(ListPartsInput input) throws TosException { + return retry(() -> client.listParts(input)); + } + + @Override + public ListMultipartUploadsV2Output listMultipartUploads(ListMultipartUploadsV2Input input) + throws TosException { + return retry(() -> client.listMultipartUploads(input)); + } + + @Override + public RenameObjectOutput renameObject(RenameObjectInput input) throws TosException { + return retry(() -> client.renameObject(input)); + } + + @Override + public RestoreObjectOutput restoreObject(RestoreObjectInput input) throws TosException { + return retry(() -> client.restoreObject(input)); + } + + @Override + public PutSymlinkOutput putSymlink(PutSymlinkInput input) throws TosException { + return retry(() -> client.putSymlink(input)); + } + + @Override + public GetSymlinkOutput getSymlink(GetSymlinkInput input) throws TosException { + return retry(() -> client.getSymlink(input)); + } + + @Override + public PreSignedURLOutput preSignedURL(PreSignedURLInput input) throws TosException { + return retry(() -> client.preSignedURL(input)); + } + + @Override + public PreSignedPostSignatureOutput preSignedPostSignature(PreSignedPostSignatureInput input) + throws TosException { + return retry(() -> client.preSignedPostSignature(input)); + } + + @Override + public PreSingedPolicyURLOutput preSingedPolicyURL(PreSingedPolicyURLInput input) + throws TosException { + return retry(() -> client.preSingedPolicyURL(input)); + } + + @Override + public PreSignedPolicyURLOutput preSignedPolicyURL(PreSignedPolicyURLInput input) throws TosException { + return retry(() -> client.preSignedPolicyURL(input)); + } + + @Override + public void changeCredentials(Credentials credentials) { + retry(() -> { + client.changeCredentials(credentials); + return null; + }); + } + + @Override + public void changeRegionAndEndpoint(String region, String endpoint) { + retry(() -> { + client.changeRegionAndEndpoint(region, endpoint); + return null; + }); + } + + @Override + public void changeTransportConfig(TransportConfig config) { + retry(() -> { + client.changeTransportConfig(config); + return null; + }); + } + + @Override + public boolean refreshEndpointRegion(String s, String s1) { + return retry(() -> refreshEndpointRegion(s, s1)); + } + + @Override + public boolean refreshCredentials(String s, String s1, String s2) { + return retry(() -> refreshCredentials(s, s1, s2)); + } + + @Override + public void close() throws IOException { + client.close(); + } + + private void refresh() throws TosException { + Credential credential = provider.credential(); + if (credentialIsChanged(credential)) { + synchronized (this) { + if (credentialIsChanged(credential)) { + client.changeCredentials(provider); + usedCredential = credential; + } + } + } + } + + private boolean credentialIsChanged(Credential credential) { + return !Objects.equals(credential.getAccessKeyId(), usedCredential.getAccessKeyId()) + || !Objects.equals(credential.getAccessKeySecret(), usedCredential.getAccessKeySecret()) + || !Objects.equals(credential.getSecurityToken(), usedCredential.getSecurityToken()); + } + + private <T> T retry(Callable<T> callable) { + int attempt = 0; + while (true) { + attempt++; + try { + refresh(); + return callable.call(); + } catch (TosException e) { + if (attempt >= maxRetryTimes) { + LOG.error("Retry exhausted after {} times.", maxRetryTimes); + throw e; + } + if (isRetryableException(e, nonRetryable409ErrorCodes)) { + LOG.warn("Retry TOS request in the {} times, error: {}", attempt, + Throwables.getRootCause(e).getMessage()); + try { + // last time does not need to sleep + Thread.sleep(RetryableUtils.backoff(attempt)); + } catch (InterruptedException ex) { + throw new TosClientException("tos: request interrupted.", ex); + } + } else { + throw e; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @VisibleForTesting + static boolean isRetryableException(TosException e, List<String> nonRetryable409ErrorCodes) { + return e.getStatusCode() >= HttpStatus.INTERNAL_SERVER_ERROR + || e.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS + || e.getCause() instanceof SocketException + || e.getCause() instanceof UnknownHostException + || e.getCause() instanceof SSLException + || e.getCause() instanceof SocketTimeoutException + || e.getCause() instanceof InterruptedException + || isRetryableTosClientException(e) + || isRetryableTosServerException(e, nonRetryable409ErrorCodes); + } + + private static boolean isRetryableTosClientException(TosException e) { + return e instanceof TosClientException + && e.getCause() instanceof IOException + && !(e.getCause() instanceof EOFException); + } + + private static boolean isRetryableTosServerException(TosException e, List<String> nonRetryable409ErrorCodes) { + return e instanceof TosServerException + && e.getStatusCode() == HttpStatus.CONFLICT + && isRetryableTosConflictException((TosServerException) e, nonRetryable409ErrorCodes); + } + + private static boolean isRetryableTosConflictException(TosServerException e, List<String> nonRetryableCodes) { + String errorCode = e.getEc(); + return StringUtils.isEmpty(errorCode) || !nonRetryableCodes.contains(errorCode); + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java new file mode 100644 index 00000000000..ebf7f738579 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.TOSClientConfiguration; +import com.volcengine.tos.TosException; +import com.volcengine.tos.transport.TransportConfig; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.conf.TosKeys; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.object.tos.auth.CredentialsProvider; +import org.apache.hadoop.fs.tosfs.util.ParseUtils; +import org.apache.hadoop.fs.tosfs.util.TOSClientContextUtils; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.util.VersionInfo; + +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME; + +public class DelegationClientBuilder { + + public static final int DISABLE_TOS_RETRY_VALUE = -1; + private static final String TOS_ENDPOINT_KEY = ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME); + private static final String TOS_REGION_KEY = ConfKeys.FS_TOS_REGION.key(TOS_SCHEME); + + @VisibleForTesting + static final Map<String, DelegationClient> CACHE = new ConcurrentHashMap<>(); + + private String bucket; + private Configuration conf; + + public DelegationClientBuilder bucket(String bucket) { + this.bucket = bucket; + return this; + } + + public DelegationClientBuilder conf(Configuration conf) { + this.conf = conf; + return this; + } + + public DelegationClient build() throws TosException { + Preconditions.checkNotNull(bucket, "Bucket cannot be null"); + Preconditions.checkNotNull(conf, "Conf cannot be null"); + String endpoint = getAndCheckEndpoint(conf); + String region = getAndCheckRegion(conf, endpoint); + + if (conf.getBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, + TosKeys.FS_TOS_DISABLE_CLIENT_CACHE_DEFAULT)) { + return createNewClient(conf, endpoint, region, bucket, false); + } + return CACHE.computeIfAbsent(bucket, client -> createNewClient(conf, endpoint, region, bucket, true)); + } + + private DelegationClient createNewClient(Configuration conf, String endpoint, String region, + String bucket, boolean cached) { + CredentialsProvider provider = createProvider(conf, bucket); + TOSClientConfiguration clientConfiguration = TOSClientConfiguration.builder() + .region(region) + .endpoint(endpoint) + .credentials(provider) + .enableCrc(conf.getBoolean( + TosKeys.FS_TOS_CRC_CHECK_ENABLED, TosKeys.FS_TOS_CRC_CHECK_ENABLED_DEFAULT)) + .transportConfig(createTransportConfig(conf)) + .userAgentProductName(conf.get( + TosKeys.FS_TOS_USER_AGENT_PREFIX, TosKeys.FS_TOS_USER_AGENT_PREFIX_DEFAULT)) + .userAgentSoftName(Constants.PROTON) + .userAgentSoftVersion(VersionInfo.getVersion()) + .build(); + + int maxRetryTimes = conf.getInt(TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES, + TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES_DEFAULT); + List<String> nonRetryable409ErrorCodes = Arrays.asList( + conf.getTrimmedStrings(TosKeys.FS_TOS_FAST_FAILURE_409_ERROR_CODES, + TosKeys.FS_TOS_FAST_FAILURE_409_ERROR_CODES_DEFAULT)); + + if (cached) { + return new CachedClient(clientConfiguration, maxRetryTimes, nonRetryable409ErrorCodes); + } else { + return new DelegationClient(clientConfiguration, maxRetryTimes, nonRetryable409ErrorCodes); + } + } + + private CredentialsProvider createProvider(Configuration conf, String bucket) { + try { + CredentialsProvider provider = (CredentialsProvider) Class.forName( + conf.get(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, + TosKeys.FS_TOS_CREDENTIALS_PROVIDER_DEFAULT)) + .getDeclaredConstructor() + .newInstance(); + provider.initialize(conf, bucket); + return provider; + } catch (ClassNotFoundException | + InstantiationException | + IllegalAccessException | + InvocationTargetException | + NoSuchMethodException e) { + throw new TosException(e); + } + } + + private String getAndCheckEndpoint(Configuration conf) { + String endpoint = conf.get(TOS_ENDPOINT_KEY); + if (StringUtils.isBlank(endpoint)) { + endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT); + } + Preconditions.checkNotNull(endpoint, "%s cannot be null", TOS_ENDPOINT_KEY); + return endpoint.trim(); + } + + private String getAndCheckRegion(Configuration conf, String endpoint) { + String region = conf.get(TOS_REGION_KEY); + if (StringUtils.isNotBlank(region)) { + return region.trim(); + } + region = TOSClientContextUtils.parseRegion(endpoint); + Preconditions.checkNotNull(region, "%s cannot be null", TOS_REGION_KEY); + return region.trim(); + } + + private TransportConfig createTransportConfig(Configuration conf) { + TransportConfig.TransportConfigBuilder builder = TransportConfig.builder(); + // Disable tos sdk retry with negative number since we have set retry strategy above TOS SDK, + // which cannot support retry all input streams via mark & reset API. + // It's hard to use it as there are some restrictions. + // the TOS SDK will reset the max retry count with 3 if the configured count equal to 0. + builder.maxRetryCount(DISABLE_TOS_RETRY_VALUE); + + builder.maxConnections(conf.getInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, + TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS_DEFAULT)); + builder.idleConnectionTimeMills(conf.getInt(TosKeys.FS_TOS_HTTP_IDLE_CONNECTION_TIME_MILLS, + TosKeys.FS_TOS_HTTP_IDLE_CONNECTION_TIME_MILLS_DEFAULT)); + builder.connectTimeoutMills(conf.getInt(TosKeys.FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS, + TosKeys.FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS_DEFAULT)); + builder.readTimeoutMills(conf.getInt(TosKeys.FS_TOS_HTTP_READ_TIMEOUT_MILLS, + TosKeys.FS_TOS_HTTP_READ_TIMEOUT_MILLS_DEFAULT)); + builder.writeTimeoutMills(conf.getInt(TosKeys.FS_TOS_HTTP_WRITE_TIMEOUT_MILLS, + TosKeys.FS_TOS_HTTP_WRITE_TIMEOUT_MILLS_DEFAULT)); + builder.enableVerifySSL(conf.getBoolean(TosKeys.FS_TOS_HTTP_ENABLE_VERIFY_SSL, + TosKeys.FS_TOS_HTTP_ENABLE_VERIFY_SSL_DEFAULT)); + builder.dnsCacheTimeMinutes(conf.getInt(TosKeys.FS_TOS_HTTP_DNS_CACHE_TIME_MINUTES, + TosKeys.FS_TOS_HTTP_DNS_CACHE_TIME_MINUTES_DEFAULT)); + + return builder.build(); + } + + static class CachedClient extends DelegationClient { + + protected CachedClient(TOSClientConfiguration configuration, int maxRetryTimes, + List<String> nonRetryable409ErrorCodes) { + super(configuration, maxRetryTimes, nonRetryable409ErrorCodes); + } + + @Override + public void close() { + // do nothing as this client may be shared by multiple upper-layer instances + } + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/GetObjectOutput.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/GetObjectOutput.java new file mode 100644 index 00000000000..6a525d7beff --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/GetObjectOutput.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.model.object.GetObjectV2Output; +import org.apache.hadoop.fs.tosfs.object.exceptions.ChecksumMismatchException; +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.util.Preconditions; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +public class GetObjectOutput { + private final GetObjectV2Output output; + private final byte[] checksum; + + public GetObjectOutput(GetObjectV2Output output, byte[] checksum) { + Preconditions.checkNotNull(checksum, "Checksum should not be null."); + this.output = output; + this.checksum = checksum; + } + + public GetObjectV2Output output() { + return output; + } + + public byte[] checksum() { + return checksum; + } + + public InputStream verifiedContent(byte[] expectedChecksum) throws IOException { + if (!Arrays.equals(expectedChecksum, checksum)) { + CommonUtils.runQuietly(this::forceClose); + throw new ChecksumMismatchException(expectedChecksum, checksum); + } + + return output.getContent(); + } + + public void forceClose() throws IOException { + output.forceClose(); + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java index 01cec37f225..55490a24bba 100644 --- a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java @@ -16,10 +16,1022 @@ package org.apache.hadoop.fs.tosfs.object.tos; -public class TOS { +import com.volcengine.tos.TOSV2; +import com.volcengine.tos.TosException; +import com.volcengine.tos.TosServerException; +import com.volcengine.tos.comm.common.ACLType; +import com.volcengine.tos.comm.common.BucketType; +import com.volcengine.tos.internal.util.TypeConverter; +import com.volcengine.tos.model.bucket.HeadBucketV2Input; +import com.volcengine.tos.model.bucket.HeadBucketV2Output; +import com.volcengine.tos.model.bucket.Tag; +import com.volcengine.tos.model.object.AbortMultipartUploadInput; +import com.volcengine.tos.model.object.AppendObjectOutput; +import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input; +import com.volcengine.tos.model.object.CopyObjectV2Input; +import com.volcengine.tos.model.object.CreateMultipartUploadInput; +import com.volcengine.tos.model.object.CreateMultipartUploadOutput; +import com.volcengine.tos.model.object.DeleteError; +import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input; +import com.volcengine.tos.model.object.DeleteMultiObjectsV2Output; +import com.volcengine.tos.model.object.DeleteObjectInput; +import com.volcengine.tos.model.object.DeleteObjectTaggingInput; +import com.volcengine.tos.model.object.GetFileStatusInput; +import com.volcengine.tos.model.object.GetFileStatusOutput; +import com.volcengine.tos.model.object.GetObjectBasicOutput; +import com.volcengine.tos.model.object.GetObjectTaggingInput; +import com.volcengine.tos.model.object.GetObjectTaggingOutput; +import com.volcengine.tos.model.object.GetObjectV2Input; +import com.volcengine.tos.model.object.GetObjectV2Output; +import com.volcengine.tos.model.object.HeadObjectV2Input; +import com.volcengine.tos.model.object.HeadObjectV2Output; +import com.volcengine.tos.model.object.ListMultipartUploadsV2Input; +import com.volcengine.tos.model.object.ListMultipartUploadsV2Output; +import com.volcengine.tos.model.object.ListObjectsType2Input; +import com.volcengine.tos.model.object.ListObjectsType2Output; +import com.volcengine.tos.model.object.ListedCommonPrefix; +import com.volcengine.tos.model.object.ListedObjectV2; +import com.volcengine.tos.model.object.ListedUpload; +import com.volcengine.tos.model.object.ObjectMetaRequestOptions; +import com.volcengine.tos.model.object.ObjectTobeDeleted; +import com.volcengine.tos.model.object.PutObjectOutput; +import com.volcengine.tos.model.object.PutObjectTaggingInput; +import com.volcengine.tos.model.object.RenameObjectInput; +import com.volcengine.tos.model.object.TagSet; +import com.volcengine.tos.model.object.UploadPartCopyV2Input; +import com.volcengine.tos.model.object.UploadPartCopyV2Output; +import com.volcengine.tos.model.object.UploadedPartV2; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.tosfs.conf.ConfKeys; +import org.apache.hadoop.fs.tosfs.conf.TosKeys; +import org.apache.hadoop.fs.tosfs.object.BucketInfo; +import org.apache.hadoop.fs.tosfs.object.ChecksumInfo; +import org.apache.hadoop.fs.tosfs.object.ChecksumType; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.fs.tosfs.object.DirectoryStorage; +import org.apache.hadoop.fs.tosfs.object.InputStreamProvider; +import org.apache.hadoop.fs.tosfs.object.MultipartUpload; +import org.apache.hadoop.fs.tosfs.object.ObjectConstants; +import org.apache.hadoop.fs.tosfs.object.ObjectContent; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.fs.tosfs.object.ObjectStorage; +import org.apache.hadoop.fs.tosfs.object.ObjectUtils; +import org.apache.hadoop.fs.tosfs.object.Part; +import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException; +import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException; +import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest; +import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse; +import org.apache.hadoop.fs.tosfs.util.LazyReload; +import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; +import org.apache.hadoop.thirdparty.com.google.common.base.Strings; +import org.apache.hadoop.util.Lists; +import org.apache.hadoop.util.Preconditions; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.hadoop.fs.tosfs.object.tos.TOSErrorCodes.APPEND_NOT_APPENDABLE; +import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.CHECKSUM_HEADER; +import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.appendable; +import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.crc64ecma; +import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.parseChecksum; + +/** + * {@link TOS} will be initialized by the {@link ObjectStorage#initialize(Configuration, String)}. + */ +public class TOS implements DirectoryStorage { + + private static final Logger LOG = LoggerFactory.getLogger(TOS.class); + public static final String TOS_SCHEME = "tos"; public static final String ENV_TOS_ACCESS_KEY_ID = "TOS_ACCESS_KEY_ID"; public static final String ENV_TOS_SECRET_ACCESS_KEY = "TOS_SECRET_ACCESS_KEY"; public static final String ENV_TOS_SESSION_TOKEN = "TOS_SESSION_TOKEN"; public static final String ENV_TOS_ENDPOINT = "TOS_ENDPOINT"; + + private static final int NOT_FOUND_CODE = 404; + private static final int PATH_CONFLICT_CODE = 409; + private static final int INVALID_RANGE_CODE = 416; + + private static final int MIN_PART_SIZE = 5 * 1024 * 1024; + private static final int MAX_PART_COUNT = 10000; + + private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(new byte[0]); + + private Configuration conf; + private String bucket; + private DelegationClient client; + private long maxDrainBytes; + private int batchDeleteMaxRetries; + private List<String> batchDeleteRetryCodes; + private long batchDeleteRetryInterval; + private int maxDeleteObjectsCount; + private int listObjectsCount; + // the max retry times during reading object content + private int maxInputStreamRetries; + private ACLType defaultAcl; + private ChecksumInfo checksumInfo; + private BucketInfo bucketInfo; + + static { + org.apache.log4j.Logger logger = LogManager.getLogger("io.proton.shaded.com.volcengine.tos"); + String logLevel = System.getProperty("tos.log.level", "WARN"); + + LOG.debug("Reset the log level of io.proton.shaded.com.volcengine.tos with {} ", logLevel); + logger.setLevel(Level.toLevel(logLevel.toUpperCase(), Level.WARN)); + } + + @Override + public void initialize(Configuration conf, String bucket) { + this.conf = conf; + this.bucket = bucket; + client = new DelegationClientBuilder().conf(conf).bucket(bucket).build(); + maxDrainBytes = + conf.getLong(TosKeys.FS_TOS_MAX_DRAIN_BYTES, TosKeys.FS_TOS_MAX_DRAIN_BYTES_DEFAULT); + batchDeleteMaxRetries = conf.getInt(TosKeys.FS_TOS_BATCH_DELETE_MAX_RETRIES, + TosKeys.FS_TOS_BATCH_DELETE_MAX_RETRIES_DEFAULT); + batchDeleteRetryCodes = Arrays.asList( + conf.getTrimmedStrings(TosKeys.FS_TOS_BATCH_DELETE_RETRY_CODES, + TosKeys.FS_TOS_BATCH_DELETE_RETRY_CODES_DEFAULT)); + batchDeleteRetryInterval = conf.getLong(TosKeys.FS_TOS_BATCH_DELETE_RETRY_INTERVAL, + TosKeys.FS_TOS_BATCH_DELETE_RETRY_INTERVAL_DEFAULT); + maxDeleteObjectsCount = conf.getInt(TosKeys.FS_TOS_DELETE_OBJECTS_COUNT, + TosKeys.FS_TOS_DELETE_OBJECTS_COUNT_DEFAULT); + listObjectsCount = + conf.getInt(TosKeys.FS_TOS_LIST_OBJECTS_COUNT, TosKeys.FS_TOS_LIST_OBJECTS_COUNT_DEFAULT); + maxInputStreamRetries = conf.getInt(TosKeys.FS_TOS_MAX_READ_OBJECT_RETRIES, + TosKeys.FS_TOS_MAX_READ_OBJECT_RETRIES_DEFAULT); + defaultAcl = TypeConverter.convertACLType(TosKeys.FS_TOS_ACL_DEFAULT); + + String algorithm = conf.get(TosKeys.FS_TOS_CHECKSUM_ALGORITHM); + ChecksumType checksumType = ChecksumType.valueOf( + conf.get(TosKeys.FS_TOS_CHECKSUM_TYPE, TosKeys.FS_TOS_CHECKSUM_TYPE_DEFAULT).toUpperCase()); + Preconditions.checkArgument(CHECKSUM_HEADER.containsKey(checksumType), + "Checksum type %s is not supported by TOS.", checksumType.name()); + checksumInfo = new ChecksumInfo(algorithm, checksumType); + + bucketInfo = getBucketInfo(bucket); + } + + @Override + public String scheme() { + return TOS_SCHEME; + } + + @Override + public Configuration conf() { + return conf; + } + + @Override + public BucketInfo bucket() { + return bucketInfo; + } + + private BucketInfo getBucketInfo(String bucket) { + try { + HeadBucketV2Output res = + client.headBucket(HeadBucketV2Input.builder().bucket(bucket).build()); + + // BUCKET_TYPE_FNS is the general purpose bucket, BUCKET_TYPE_HNS is directory bucket. + boolean directoryBucket = BucketType.BUCKET_TYPE_HNS.equals(res.getBucketType()); + + return new BucketInfo(bucket, directoryBucket); + } catch (TosException e) { + if (e.getStatusCode() == NOT_FOUND_CODE) { + return null; + } + throw new RuntimeException(e); + } + } + + @VisibleForTesting + void setClient(DelegationClient client) { + this.client = client; + } + + private void checkAvailableClient() { + Preconditions.checkState(client != null, + "Encountered uninitialized ObjectStorage, call initialize(..) please."); + } + + @Override + public ObjectContent get(String key, long offset, long limit) { + checkAvailableClient(); + Preconditions.checkArgument(offset >= 0, "offset is a negative number: %s", offset); + + if (limit == 0) { + // Can not return empty stream when limit = 0, because the requested object might not exist. + if (head(key) != null) { + return new ObjectContent(Constants.MAGIC_CHECKSUM, EMPTY_STREAM); + } else { + throw new RuntimeException(String.format("Object %s doesn't exit", key)); + } + } + + long end = limit < 0 ? -1 : offset + limit - 1; + GetObjectFactory factory = (k, startOff, endOff) -> getObject(key, startOff, endOff); + ChainTOSInputStream chainStream = + new ChainTOSInputStream(factory, key, offset, end, maxDrainBytes, maxInputStreamRetries); + return new ObjectContent(chainStream.checksum(), chainStream); + } + + @Override + public Iterable<ObjectInfo> listDir(String key, boolean recursive) { + if (recursive) { + if (bucket().isDirectory()) { + // The directory bucket only support list object with delimiter = '/', so if we want to + // list directory recursively, we have to list each dir step by step. + return bfsListDir(key); + } else { + return listAll(key, key); + } + } else { + return innerListDir(key, key, -1); + } + } + + private Iterable<ObjectInfo> bfsListDir(String key) { + return new LazyReload<>(() -> { + final Deque<String> dirQueue = new LinkedList<>(); + AtomicReference<String> continueToken = new AtomicReference<>(""); + AtomicReference<String> curDir = new AtomicReference<>(key); + + return buf -> { + // No more objects when isTruncated is false. + if (curDir.get() == null) { + return true; + } + + ListObjectsType2Input request = + createListObjectsType2Input(curDir.get(), curDir.get(), listObjectsCount, "/", + continueToken.get()); + ListObjectsType2Output response = client.listObjectsType2(request); + + if (response.getContents() != null) { + for (ListedObjectV2 obj : response.getContents()) { + buf.add(new ObjectInfo(obj.getKey(), obj.getSize(), obj.getLastModified(), + parseChecksum(obj, checksumInfo))); + } + } + + if (response.getCommonPrefixes() != null) { + for (ListedCommonPrefix prefix : response.getCommonPrefixes()) { + buf.add(new ObjectInfo(prefix.getPrefix(), 0, new Date(), Constants.MAGIC_CHECKSUM)); + dirQueue.add(prefix.getPrefix()); + } + } + + if (response.isTruncated()) { + continueToken.set(response.getNextContinuationToken()); + } else { + curDir.set(dirQueue.poll()); + continueToken.set(""); + } + + return curDir.get() == null; + }; + }); + } + + private Iterable<ObjectInfo> innerListDir(String key, String startAfter, int limit) { + return new LazyReload<>(() -> { + AtomicReference<String> continueToken = new AtomicReference<>(""); + AtomicBoolean isTruncated = new AtomicBoolean(true); + AtomicInteger remaining = new AtomicInteger(limit < 0 ? Integer.MAX_VALUE : limit); + + return buf -> { + // No more objects when isTruncated is false. + if (!isTruncated.get()) { + return true; + } + + int remainingKeys = remaining.get(); + int maxKeys = Math.min(listObjectsCount, remainingKeys); + ListObjectsType2Input request = + createListObjectsType2Input(key, startAfter, maxKeys, "/", continueToken.get()); + ListObjectsType2Output response = client.listObjectsType2(request); + + if (response.getContents() != null) { + for (ListedObjectV2 obj : response.getContents()) { + buf.add(new ObjectInfo(obj.getKey(), obj.getSize(), obj.getLastModified(), + parseChecksum(obj, checksumInfo))); + } + } + + if (response.getCommonPrefixes() != null) { + for (ListedCommonPrefix prefix : response.getCommonPrefixes()) { + buf.add(new ObjectInfo(prefix.getPrefix(), 0, new Date(), Constants.MAGIC_CHECKSUM)); + } + } + + isTruncated.set(response.isTruncated()); + remaining.compareAndSet(remainingKeys, remainingKeys - response.getKeyCount()); + continueToken.set(response.getNextContinuationToken()); + + return !isTruncated.get(); + }; + }); + } + + @Override + public void deleteDir(String key, boolean recursive) { + checkAvailableClient(); + if (recursive) { + if (conf.getBoolean(TosKeys.FS_TOS_RMR_SERVER_ENABLED, + TosKeys.FS_FS_TOS_RMR_SERVER_ENABLED_DEFAULT)) { + DeleteObjectInput request = + DeleteObjectInput.builder().bucket(bucket).recursive(true).key(key).build(); + try { + // It's a test feature, TOS SDK don't expose atomic delete dir capability currently. + Field f = DeleteObjectInput.class.getDeclaredField("recursiveByServer"); + f.setAccessible(true); + f.setBoolean(request, true); + } catch (Exception e) { + throw new RuntimeException(e); + } + client.deleteObject(request); + } else { + if (conf.getBoolean(TosKeys.FS_TOS_RMR_CLIENT_ENABLE, + TosKeys.FS_TOS_RMR_CLIENT_ENABLE_DEFAULT)) { + client.deleteObject( + DeleteObjectInput.builder().bucket(bucket).recursive(true).key(key).build()); + } else { + recursiveDeleteDir(key); + } + } + } else { + delete(key); + } + } + + @Override + public boolean isEmptyDir(String key) { + checkAvailableClient(); + return !innerListDir(key, key, 1).iterator().hasNext(); + } + + public void recursiveDeleteDir(String key) { + for (ObjectInfo obj : innerListDir(key, key, -1)) { + if (obj.isDir()) { + recursiveDeleteDir(obj.key()); + } else { + delete(obj.key()); + } + } + delete(key); + } + + interface GetObjectFactory { + /** + * Get object content for the given object key and range. + * + * @param key The object key + * @param offset The start offset of object content + * @param end The end offset of object content + * @return {@link GetObjectOutput} + */ + GetObjectOutput create(String key, long offset, long end); + } + + public GetObjectOutput getObject(String key, long offset, long end) { + checkAvailableClient(); + Preconditions.checkArgument(offset >= 0, "offset is a negative number: %s", offset); + + try { + GetObjectV2Input request = GetObjectV2Input.builder().bucket(bucket).key(key) + .options(ObjectMetaRequestOptions.builder().range(offset, end).build()).build(); + GetObjectV2Output output = client.getObject(request); + + byte[] checksum = parseChecksum(output.getRequestInfo().getHeader(), checksumInfo); + return new GetObjectOutput(output, checksum); + } catch (TosException e) { + if (e instanceof TosServerException) { + TosServerException tosException = (TosServerException) e; + if (tosException.getStatusCode() == INVALID_RANGE_CODE) { + ObjectInfo info = head(key); + // if the object is empty or the requested offset is equal to object size, + // return empty stream directly, otherwise, throw exception. + if (info.size() == 0 || offset == info.size()) { + return new GetObjectOutput( + new GetObjectV2Output(new GetObjectBasicOutput(), EMPTY_STREAM), info.checksum()); + } else { + throw new RuntimeException(e); + } + } + } + throw new RuntimeException(e); + } + } + + @Override + public byte[] put(String key, InputStreamProvider streamProvider, long contentLength) { + checkAvailableClient(); + PutObjectOutput res = client.put(bucket, key, streamProvider, contentLength, defaultAcl); + return ObjectInfo.isDir(key) ? + Constants.MAGIC_CHECKSUM : + parseChecksum(res.getRequestInfo().getHeader(), checksumInfo); + } + + @Override + public byte[] append(String key, InputStreamProvider streamProvider, long contentLength) { + if (bucketInfo.isDirectory()) { + return hnsAppend(key, streamProvider, contentLength); + } else { + return fnsAppend(key, streamProvider, contentLength); + } + } + + private byte[] hnsAppend(String key, InputStreamProvider streamProvider, long contentLength) { + checkAvailableClient(); + + long offset = 0; + String preCrc64; + + TosObjectInfo obj = innerHead(key); + if (obj == null) { + if (contentLength == 0) { + throw new NotAppendableException(String.format( + "%s is not appendable because append non-existed object with " + + "zero byte is not supported.", key)); + } + + // In HNS, append non-existed object is not allowed. Pre-create an empty object before + // performing appendObject. + PutObjectOutput res = client.put(bucket, key, () -> EMPTY_STREAM, 0, defaultAcl); + preCrc64 = res.getHashCrc64ecma(); + } else { + if (contentLength == 0) { + return obj.checksum(); + } + offset = obj.size(); + preCrc64 = obj.crc64ecma(); + } + + AppendObjectOutput res = + client.appendObject(bucket, key, streamProvider, offset, contentLength, preCrc64, + defaultAcl); + return ObjectInfo.isDir(key) ? Constants.MAGIC_CHECKSUM : + parseChecksum(res.getRequestInfo().getHeader(), checksumInfo); + } + + private byte[] fnsAppend(String key, InputStreamProvider streamProvider, long contentLength) { + checkAvailableClient(); + + TosObjectInfo obj = innerHead(key); + if (obj != null) { + if (!obj.appendable()) { + throw new NotAppendableException(String.format("%s is not appendable.", key)); + } + if (contentLength == 0) { + return obj.checksum(); + } + } else if (contentLength == 0) { + throw new NotAppendableException(String.format("%s is not appendable because append" + + " non-existed object with zero byte is not supported.", key)); + } + + long offset = obj == null ? 0 : obj.size(); + String preCrc64 = obj == null ? null : obj.crc64ecma(); + AppendObjectOutput res; + try { + res = client.appendObject(bucket, key, streamProvider, offset, contentLength, preCrc64, + defaultAcl); + } catch (TosServerException e) { + if (e.getStatusCode() == 409 && APPEND_NOT_APPENDABLE.equals(e.getEc())) { + throw new NotAppendableException(String.format("%s is not appendable.", key)); + } + throw e; + } + + return ObjectInfo.isDir(key) ? + Constants.MAGIC_CHECKSUM : + parseChecksum(res.getRequestInfo().getHeader(), checksumInfo); + } + + @Override + public void delete(String key) { + checkAvailableClient(); + client.deleteObject(DeleteObjectInput.builder().bucket(bucket).key(key).build()); + } + + @Override + public List<String> batchDelete(List<String> keys) { + checkAvailableClient(); + int totalKeyCnt = keys.size(); + + Preconditions.checkArgument(totalKeyCnt <= maxDeleteObjectsCount, + "The batch delete object count should <= %s", maxDeleteObjectsCount); + + + List<DeleteError> failedKeys = innerBatchDelete(keys); + for (int retry = 1; retry < batchDeleteMaxRetries && !failedKeys.isEmpty(); retry++) { + if (isBatchDeleteRetryable(failedKeys)) { + try { + Thread.sleep(batchDeleteRetryInterval); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + failedKeys = innerBatchDelete(deleteErrorKeys(failedKeys)); + } else { + LOG.warn("{} of {} objects deleted failed, and cannot be retried, detail: {}", + failedKeys.size(), + totalKeyCnt, + Joiner.on(",\n").join(failedKeys)); + break; + } + } + + if (!failedKeys.isEmpty()) { + LOG.warn("{} of {} objects deleted failed after retry {} times.", + failedKeys.size(), totalKeyCnt, batchDeleteMaxRetries); + } + + return deleteErrorKeys(failedKeys); + } + + @Override + public void deleteAll(String prefix) { + if (bucket().isDirectory()) { + deleteDir(prefix, true); + } else { + Iterable<ObjectInfo> objects = listAll(prefix, ""); + ObjectUtils.deleteAllObjects(this, objects, + conf.getInt(ConfKeys.FS_BATCH_DELETE_SIZE.key(scheme()), + ConfKeys.FS_BATCH_DELETE_SIZE_DEFAULT)); + } + } + + private List<DeleteError> innerBatchDelete(List<String> keys) { + List<ObjectTobeDeleted> toBeDeleted = Lists.newArrayList(); + for (String key : keys) { + toBeDeleted.add(ObjectTobeDeleted.builder().key(key).build()); + } + + DeleteMultiObjectsV2Output deletedRes = client.deleteMultiObjects(DeleteMultiObjectsV2Input + .builder() + .bucket(bucket) + .objects(toBeDeleted) + .build()); + + return deletedRes.getErrors() == null ? Lists.newArrayList() : deletedRes.getErrors(); + } + + private boolean isBatchDeleteRetryable(List<DeleteError> failedKeys) { + for (DeleteError errorKey : failedKeys) { + if (batchDeleteRetryCodes.contains(errorKey.getCode())) { + LOG.warn("Failed to delete object, which might be deleted succeed after retry, detail: {}", + errorKey); + } else { + return false; + } + } + return true; + } + + private static List<String> deleteErrorKeys(List<DeleteError> errorKeys) { + List<String> keys = Lists.newArrayList(); + for (DeleteError error : errorKeys) { + keys.add(error.getKey()); + } + return keys; + } + + @Override + public ObjectInfo head(String key) { + return innerHead(key); + } + + private TosObjectInfo innerHead(String key) { + checkAvailableClient(); + try { + HeadObjectV2Input request = HeadObjectV2Input.builder().bucket(bucket).key(key).build(); + HeadObjectV2Output response = client.headObject(request); + + // use crc64ecma/crc32c as checksum to compare object contents, don't use eTag as checksum value + // since PUT & MPU operations have different object etags for same content. + Map<String, String> headers = response.getRequestInfo().getHeader(); + byte[] checksum = parseChecksum(headers, checksumInfo); + boolean isDir = bucket().isDirectory() ? response.isDirectory() : ObjectInfo.isDir(key); + + return new TosObjectInfo(key, response.getContentLength(), response.getLastModifiedInDate(), + checksum, isDir, + appendable(headers), crc64ecma(headers)); + } catch (TosException e) { + if (e.getStatusCode() == NOT_FOUND_CODE) { + return null; + } + + if (e.getStatusCode() == PATH_CONFLICT_CODE) { + // if a directory 'a/b/' exists in directory bucket, both headObject('a/b') and + // headObject('a/b/') will get directory info, but the response key should be 'a/b/'. + // But if a file 'a/b' exists in directory bucket, only headObject('a/b') will get file + // info, headObject('a/b/') will get 409 error. + throw new InvalidObjectKeyException(e); + } + + throw new RuntimeException(e); + } + } + + @Override + public Iterable<ListObjectsResponse> list(ListObjectsRequest req) { + return new LazyReload<>(() -> + { + AtomicReference<String> continueToken = new AtomicReference<>(""); + AtomicBoolean isTruncated = new AtomicBoolean(true); + AtomicInteger remaining = + new AtomicInteger(req.maxKeys() < 0 ? Integer.MAX_VALUE : req.maxKeys()); + + return buf -> { + // No more objects when isTruncated is false. + if (!isTruncated.get()) { + return true; + } + + int remainingKeys = remaining.get(); + int maxKeys = Math.min(listObjectsCount, remainingKeys); + ListObjectsType2Input request = + createListObjectsType2Input(req.prefix(), req.startAfter(), maxKeys, req.delimiter(), + continueToken.get()); + ListObjectsType2Output response = client.listObjectsType2(request); + List<ObjectInfo> objects = listObjectsOutputToObjectInfos(response); + List<String> commonPrefixes = listObjectsOutputToCommonPrefixes(response); + buf.add(new ListObjectsResponse(objects, commonPrefixes)); + + if (maxKeys < listObjectsCount) { + isTruncated.set(false); + } else { + continueToken.set(response.getNextContinuationToken()); + remaining.compareAndSet(remainingKeys, remainingKeys - response.getKeyCount()); + if (remaining.get() == 0) { + isTruncated.set(false); + } else { + isTruncated.set(response.isTruncated()); + } + } + return !isTruncated.get(); + }; + }); + } + + private List<String> listObjectsOutputToCommonPrefixes(ListObjectsType2Output listObjectsOutput) { + if (listObjectsOutput.getCommonPrefixes() == null) { + return Lists.newArrayList(); + } + + return listObjectsOutput.getCommonPrefixes() + .stream() + .map(ListedCommonPrefix::getPrefix) + .collect(Collectors.toList()); + } + + private List<ObjectInfo> listObjectsOutputToObjectInfos( + ListObjectsType2Output listObjectsOutput) { + if (listObjectsOutput.getContents() == null) { + return Lists.newArrayList(); + } + return listObjectsOutput.getContents().stream() + .map(obj -> new ObjectInfo( + obj.getKey(), + obj.getSize(), + obj.getLastModified(), + parseChecksum(obj, checksumInfo))) + .collect(Collectors.toList()); + } + + private ListObjectsType2Input createListObjectsType2Input( + String prefix, String startAfter, int maxKeys, String delimiter, String continueToken) { + ListObjectsType2Input.ListObjectsType2InputBuilder builder = ListObjectsType2Input.builder() + .bucket(bucket) + .prefix(prefix) + .startAfter(startAfter) + .delimiter(delimiter) + .maxKeys(maxKeys); + + if (!Strings.isNullOrEmpty(continueToken)) { + builder.continuationToken(continueToken); + } + return builder.build(); + } + + @Override + public MultipartUpload createMultipartUpload(String key) { + checkAvailableClient(); + CreateMultipartUploadInput input = CreateMultipartUploadInput.builder() + .bucket(bucket) + .key(key) + .options(createMetaOptions()) + .build(); + CreateMultipartUploadOutput output = client.createMultipartUpload(input); + return new MultipartUpload(output.getKey(), output.getUploadID(), MIN_PART_SIZE, + MAX_PART_COUNT); + } + + @Override + public Part uploadPart( + String key, String uploadId, int partNum, + InputStreamProvider streamProvider, long contentLength) { + checkAvailableClient(); + return client.uploadPart(bucket, key, uploadId, partNum, streamProvider, contentLength, + defaultAcl); + } + + @Override + public byte[] completeUpload(String key, String uploadId, List<Part> uploadParts) { + checkAvailableClient(); + List<UploadedPartV2> uploadedPartsV2 = uploadParts.stream().map( + part -> UploadedPartV2.builder() + .etag(part.eTag()) + .partNumber(part.num()) + .size(part.size()) + .build() + ).collect(Collectors.toList()); + CompleteMultipartUploadV2Input input = CompleteMultipartUploadV2Input.builder() + .bucket(bucket) + .key(key) + .uploadID(uploadId) + .uploadedParts(uploadedPartsV2) + .build(); + return parseChecksum(client.completeMultipartUpload(input).getRequestInfo().getHeader(), + checksumInfo); + } + + @Override + public void abortMultipartUpload(String key, String uploadId) { + checkAvailableClient(); + AbortMultipartUploadInput input = AbortMultipartUploadInput.builder() + .bucket(bucket) + .key(key) + .uploadID(uploadId) + .build(); + client.abortMultipartUpload(input); + } + + @Override + public Iterable<MultipartUpload> listUploads(String prefix) { + checkAvailableClient(); + return new LazyReload<>(() -> { + AtomicReference<String> nextKeyMarker = new AtomicReference<>(""); + AtomicReference<String> nextUploadIdMarker = new AtomicReference<>(""); + AtomicBoolean isTruncated = new AtomicBoolean(true); + return buf -> { + // No more uploads when isTruncated is false. + if (!isTruncated.get()) { + return true; + } + ListMultipartUploadsV2Input input = ListMultipartUploadsV2Input.builder() + .bucket(bucket) + .prefix(prefix) + .keyMarker(nextKeyMarker.get()) + .uploadIDMarker(nextUploadIdMarker.get()) + .build(); + ListMultipartUploadsV2Output output = client.listMultipartUploads(input); + isTruncated.set(output.isTruncated()); + if (output.getUploads() != null) { + // Fill the reloaded uploads into buffer. + for (ListedUpload upload : output.getUploads()) { + buf.add(new MultipartUpload(upload.getKey(), upload.getUploadID(), + ObjectConstants.MIN_PART_SIZE, ObjectConstants.MAX_PART_COUNT)); + } + LOG.info("Retrieve {} uploads with prefix: {}, marker: {}", + output.getUploads().size(), nextKeyMarker.get(), nextUploadIdMarker.get()); + } + // Refresh the nextKeyMarker and nextUploadMarker for the next reload. + nextKeyMarker.set(output.getNextKeyMarker()); + nextUploadIdMarker.set(output.getNextUploadIdMarker()); + + return !isTruncated.get(); + }; + }); + } + + @Override + public Part uploadPartCopy( + String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart, + long copySourceRangeEnd) { + Preconditions.checkArgument(!Strings.isNullOrEmpty(srcKey), "Source key should not be empty."); + Preconditions.checkArgument(!Strings.isNullOrEmpty(dstKey), "Dest key should not be empty."); + Preconditions.checkArgument(!Strings.isNullOrEmpty(uploadId), "Upload ID should not be empty."); + Preconditions.checkArgument(copySourceRangeStart >= 0, "CopySourceRangeStart must be >= 0."); + Preconditions.checkArgument(copySourceRangeEnd >= 0, "CopySourceRangeEnd must be >= 0."); + Preconditions.checkNotNull(copySourceRangeEnd >= copySourceRangeStart, + "CopySourceRangeEnd must be >= copySourceRangeStart."); + checkAvailableClient(); + UploadPartCopyV2Input input = UploadPartCopyV2Input.builder() + .bucket(bucket) + .key(dstKey) + .uploadID(uploadId) + .sourceBucket(bucket) + .sourceKey(srcKey) + .partNumber(partNum) + .copySourceRange(copySourceRangeStart, copySourceRangeEnd) + .options(createMetaOptions()) + .build(); + UploadPartCopyV2Output output = client.uploadPartCopy(input); + return new Part(output.getPartNumber(), copySourceRangeEnd - copySourceRangeStart + 1, + output.getEtag()); + } + + @Override + public void copy(String srcKey, String dstKey) { + checkAvailableClient(); + CopyObjectV2Input input = CopyObjectV2Input.builder() + .bucket(bucket) + .key(dstKey) + .srcBucket(bucket) + .srcKey(srcKey) + .options(createMetaOptions()) + .build(); + client.copyObject(input); + } + + private ObjectMetaRequestOptions createMetaOptions() { + return new ObjectMetaRequestOptions().setAclType(defaultAcl); + } + + @Override + public void rename(String srcKey, String dstKey) { + checkAvailableClient(); + Preconditions.checkArgument(!Objects.equals(srcKey, dstKey), + "Cannot rename to the same object"); + + RenameObjectInput request = RenameObjectInput.builder() + .bucket(bucket) + .key(srcKey) + .newKey(dstKey) + .build(); + client.renameObject(request); + } + + // TOS allows up to 10 tags. AWS S3 allows up to 10 tags too. + @Override + public void putTags(String key, Map<String, String> newTags) { + checkAvailableClient(); + List<Tag> tags = newTags.entrySet().stream() + .map(e -> new Tag().setKey(e.getKey()).setValue(e.getValue())) + .collect(Collectors.toList()); + + if (tags.size() > 0) { + client.putObjectTagging(createPutTagInput(bucket, key, tags)); + } else { + client.deleteObjectTagging(createDeleteTagInput(bucket, key)); + } + } + + @Override + public Map<String, String> getTags(String key) { + Map<String, String> result = new HashMap<>(); + for (Tag tag : getObjectTaggingList(key)) { + result.put(tag.getKey(), tag.getValue()); + } + return result; + } + + private List<Tag> getObjectTaggingList(String key) { + checkAvailableClient(); + + GetObjectTaggingInput input = GetObjectTaggingInput.builder() + .bucket(bucket) + .key(key) + .build(); + GetObjectTaggingOutput output = client.getObjectTagging(input); + + TagSet tagSet = output.getTagSet(); + if (tagSet == null || tagSet.getTags() == null) { + return new ArrayList<>(); + } + return tagSet.getTags(); + } + + private static PutObjectTaggingInput createPutTagInput(String bucket, String key, + List<Tag> tags) { + return PutObjectTaggingInput.builder() + .bucket(bucket) + .key(key) + .tagSet(TagSet.builder().tags(tags).build()) + .build(); + } + + private static DeleteObjectTaggingInput createDeleteTagInput(String bucket, String key) { + return DeleteObjectTaggingInput.builder() + .bucket(bucket) + .key(key) + .build(); + } + + /** + * Implement Hadoop FileSystem.getFileStatus semantics through + * {@link TOSV2#getFileStatus(GetFileStatusInput)}. <br> + * + * The detail behavior are as follows: + * <ul> + * <li>Assume object 'a/b' exists in TOS, getFileStatus("a/b") will get object('a/b') succeed, + * getFileStatus("a/b/") will get 404.</li> + * <li>Assume object 'a/b/' exists in TOS, both getFileStatus("a/b") & getFileStatus("a/b/") + * will get object('a/b/') succeed </li> + * <li>Assume object 'a/b/c' exists in TOS, both getFileStatus("a/b") & getFileStatus("a/b/") + * will get object('a/b/') succeed.</li> + * </ul> + * <p> + * And the following is the logic of {@link TOSV2#getFileStatus(GetFileStatusInput)}: <br> + * Step 1: Head the specified key, if the head operation is successful, the response is filled + * with the actual object. <br> + * Step 2: Append the key with the suffix '/' to perform list operation, if the list operation is + * successful, the response is filled with the <strong>first object from the listing results + * </strong>; if there are no objects, return 404. <br> + * + * @param key for the object. + * @return object + */ + private ObjectInfo getFileStatus(String key) { + checkAvailableClient(); + Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "key should not be empty."); + + GetFileStatusInput input = GetFileStatusInput.builder() + .bucket(bucket) + .key(key) + .build(); + try { + GetFileStatusOutput output = client.getFileStatus(input); + if (key.equals(output.getKey()) && !ObjectInfo.isDir(output.getKey())) { + return new ObjectInfo(key, output.getSize(), output.getLastModifiedInDate(), + parseChecksum(output, checksumInfo)); + } else { + String dirKey = ObjectInfo.isDir(key) ? key : key + '/'; + + // If only the prefix exists but dir object key doesn't exist, will use the current date as + // the modified date. + Date lastModifiedInDate = + dirKey.equals(output.getKey()) ? output.getLastModifiedInDate() : new Date(); + return new ObjectInfo(dirKey, 0, lastModifiedInDate, Constants.MAGIC_CHECKSUM, true); + } + } catch (TosException e) { + // the specified object does not exist. + if (e.getStatusCode() == NOT_FOUND_CODE) { + return null; + } + + if (e.getStatusCode() == PATH_CONFLICT_CODE) { + throw new InvalidObjectKeyException(e); + } + throw new RuntimeException(e); + } + } + + @Override + public ObjectInfo objectStatus(String key) { + if (bucket().isDirectory()) { + return head(key); + } else if (conf.getBoolean(TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED, + TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED_DEFAULT)) { + return getFileStatus(key); + } else { + ObjectInfo obj = head(key); + if (obj == null && !ObjectInfo.isDir(key)) { + key = key + '/'; + obj = head(key); + } + + if (obj == null) { + Iterable<ObjectInfo> objs = list(key, null, 1); + if (objs.iterator().hasNext()) { + obj = new ObjectInfo(key, 0, new Date(0), Constants.MAGIC_CHECKSUM, true); + } + } + + return obj; + } + } + + @Override + public ChecksumInfo checksumInfo() { + return checksumInfo; + } + + @Override + public void close() throws IOException { + client.close(); + } } diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSErrorCodes.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSErrorCodes.java new file mode 100644 index 00000000000..e478a09fc0b --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSErrorCodes.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import org.apache.hadoop.util.Lists; + +import java.util.List; +import java.util.StringJoiner; + +public class TOSErrorCodes { + private TOSErrorCodes() { + } + + // The 409 error codes of HNS + public static final String DELETE_NON_EMPTY_DIR = "0026-00000013"; + public static final String LOCATED_UNDER_A_FILE = "0026-00000020"; + public static final String COPY_BETWEEN_DIR_AND_FILE = "0026-00000021"; + public static final String PATH_LOCK_CONFLICT = "0026-00000022"; + public static final String RENAME_TO_AN_EXISTED_DIR = "0026-00000025"; + public static final String RENAME_TO_SUB_DIR = "0026-00000026"; + public static final String RENAME_BETWEEN_DIR_AND_FILE = "0026-00000027"; + + // The 409 error codes shared by HNS and FNS. + public static final String APPEND_OFFSET_NOT_MATCHED = "0017-00000208"; + public static final String APPEND_NOT_APPENDABLE = "0017-00000209"; + + + // The bellow error cannot be solved by retry the request except the code PATH_LOCK_CONFLICT, + // so need to fail fast. + public static String FAST_FAILURE_CONFLICT_ERROR_CODES = new StringJoiner(",") + .add(DELETE_NON_EMPTY_DIR) + .add(LOCATED_UNDER_A_FILE) + .add(COPY_BETWEEN_DIR_AND_FILE) + .add(RENAME_TO_AN_EXISTED_DIR) + .add(RENAME_TO_SUB_DIR) + .add(RENAME_BETWEEN_DIR_AND_FILE) + .add(APPEND_OFFSET_NOT_MATCHED) + .add(APPEND_NOT_APPENDABLE) + .toString(); +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSInputStream.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSInputStream.java new file mode 100644 index 00000000000..6e05a0ca64f --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSInputStream.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import org.apache.hadoop.fs.tosfs.util.CommonUtils; +import org.apache.hadoop.thirdparty.com.google.common.io.ByteStreams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +class TOSInputStream extends InputStream { + private static final Logger LOG = LoggerFactory.getLogger(TOSInputStream.class); + + private final GetObjectOutput output; + private final InputStream stream; + private final AtomicBoolean closed = new AtomicBoolean(false); + + private long curOff; + private final long endOff; // range end offset (inclusive) + private final long maxDrainByteSize; + + TOSInputStream(GetObjectOutput output, long startOff, long endOff, long maxDrainByteSize, byte[] expectedChecksum) + throws IOException { + this.output = output; + this.stream = output.verifiedContent(expectedChecksum); + this.curOff = startOff; + this.endOff = endOff; + this.maxDrainByteSize = maxDrainByteSize; + } + + @Override + public int read() throws IOException { + int b = stream.read(); + curOff += 1; + return b; + } + + @Override + public int read(byte b[], int off, int len) throws IOException { + int readed = 0; + int n; + do { + n = stream.read(b, off + readed, len - readed); + if (n > 0) { + readed += n; + } + } while (n > 0); + + if (readed == 0) { + return n; + } else { + curOff += readed; + return readed; + } + } + + // Only visible for testing. + GetObjectOutput getObjectOutput() { + return output; + } + + @Override + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + if (endOff >= 0) { + // The unread bytes is known. we just skip the bytes if gap <= expected drain size (to reuse the socket conn), + // otherwise we force close the socket conn without reading any bytes in the future. + long gap = endOff - curOff + 1; + if (gap <= maxDrainByteSize) { + // The close will try to drain bytes internally. + stream.close(); + } else { + CommonUtils.runQuietly(output::forceClose, false); + } + + } else { + // The unread bytes is unknown, we try to read the expected drain bytes to see if it's EOF now. If EOF then just + // close the stream to reuse the socket conn, otherwise close the connection directly for saving draining time. + try { + ByteStreams.skipFully(stream, maxDrainByteSize); + } catch (Exception e) { + if (e instanceof EOFException) { + LOG.debug("Stream is EOF now, just close the stream to reuse the socket connection."); + stream.close(); + } else { + LOG.debug("Stream skipFully encountered exception, force close the socket connection.", e); + // Force close the socket connection. + CommonUtils.runQuietly(output::forceClose, false); + } + return; + } + + // Force close the socket connection. + CommonUtils.runQuietly(output::forceClose, false); + } + } + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSUtils.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSUtils.java new file mode 100644 index 00000000000..93deef9e941 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSUtils.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import com.volcengine.tos.model.object.GetFileStatusOutput; +import com.volcengine.tos.model.object.ListedObjectV2; +import org.apache.hadoop.fs.tosfs.common.Bytes; +import org.apache.hadoop.fs.tosfs.object.ChecksumInfo; +import org.apache.hadoop.fs.tosfs.object.ChecksumType; +import org.apache.hadoop.fs.tosfs.object.Constants; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; + +import java.util.Map; + +public class TOSUtils { + private TOSUtils() {} + + // Checksum header. + public static final Map<ChecksumType, String> CHECKSUM_HEADER = ImmutableMap.of( + ChecksumType.CRC32C, "x-tos-hash-crc32c", + ChecksumType.CRC64ECMA, "x-tos-hash-crc64ecma" + ); + + // Object type header. Object is either 'Appendable' or 'Normal'. + public static final String OBJECT_TYPE_KEY = "x-tos-object-type"; + public static final String APPENDABLE_TYPE_VALUE = "Appendable"; + + // Checksum is magic checksum if the object doesn't support checksum type. + public static byte[] parseChecksum(Map<String, String> headers, ChecksumInfo checksumInfo) { + ChecksumType type = checksumInfo.checksumType(); + String header = CHECKSUM_HEADER.get(type); + if (header == null) { + return Constants.MAGIC_CHECKSUM; + } + + String checksumStr = headers.get(header); + if (checksumStr == null) { + return Constants.MAGIC_CHECKSUM; + } + + return parseChecksumStringToBytes(checksumStr, type); + } + + // Checksum is magic checksum if the object doesn't support checksum type. + public static byte[] parseChecksum(ListedObjectV2 obj, ChecksumInfo checksumInfo) { + ChecksumType type = checksumInfo.checksumType(); + + String checksumStr; + if (type == ChecksumType.CRC32C) { + checksumStr = obj.getHashCrc32c(); + } else if (type == ChecksumType.CRC64ECMA) { + checksumStr = obj.getHashCrc64ecma(); + } else { + throw new IllegalArgumentException(String.format("Checksum type %s is not supported by TOS.", type.name())); + } + + if (checksumStr == null) { + return Constants.MAGIC_CHECKSUM; + } + + return parseChecksumStringToBytes(checksumStr, type); + } + + // Checksum is magic checksum if the object doesn't support checksum type. + public static byte[] parseChecksum(GetFileStatusOutput obj, ChecksumInfo checksumInfo) { + ChecksumType type = checksumInfo.checksumType(); + + if (type == ChecksumType.CRC32C) { + return parseChecksumStringToBytes(obj.getCrc32(), type); + } else if (type == ChecksumType.CRC64ECMA) { + return parseChecksumStringToBytes(obj.getCrc64(), type); + } else { + throw new IllegalArgumentException(String.format("Checksum type %s is not supported by TOS.", type.name())); + } + } + + public static byte[] parseChecksumStringToBytes(String checksum, ChecksumType type) { + if (checksum == null) { + return Constants.MAGIC_CHECKSUM; + } + + switch (type) { + case CRC32C: + case CRC64ECMA: + return Bytes.toBytes(Long.parseUnsignedLong(checksum)); + default: + throw new IllegalArgumentException(String.format("Checksum type %s is not supported by TOS.", type.name())); + } + } + + public static String crc64ecma(Map<String, String> headers) { + String header = CHECKSUM_HEADER.get(ChecksumType.CRC64ECMA); + return headers.get(header); + } + + public static boolean appendable(Map<String, String> headers) { + String value = headers.get(OBJECT_TYPE_KEY); + return APPENDABLE_TYPE_VALUE.equals(value); + } +} + diff --git a/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TosObjectInfo.java b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TosObjectInfo.java new file mode 100644 index 00000000000..28fdb7feae7 --- /dev/null +++ b/hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TosObjectInfo.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.tosfs.object.tos; + +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.fs.tosfs.object.ObjectInfo; +import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects; + +import java.util.Date; +import java.util.Objects; + +public class TosObjectInfo extends ObjectInfo { + private final String crc64ecma; + private final boolean appendable; + + public TosObjectInfo(String key, long size, Date mtime, byte[] checksum, boolean isDir, + boolean appendable, String crc64ecma) { + super(key, size, mtime, checksum, isDir); + this.crc64ecma = crc64ecma; + this.appendable = appendable; + } + + public String crc64ecma() { + return crc64ecma; + } + + public boolean appendable() { + return appendable; + } + + @Override + public boolean equals(Object o) { + if (!super.equals(o)) { + return false; + } + + if (!(o instanceof TosObjectInfo)) { + return false; + } + + TosObjectInfo that = (TosObjectInfo) o; + return Objects.equals(appendable, that.appendable) && Objects.equals(crc64ecma, that.crc64ecma); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), appendable, crc64ecma); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("key", key()) + .add("size", size()) + .add("mtime", mtime()) + .add("checksum", Hex.encodeHexString(checksum())) + .add("isDir", isDir()) + .add("appendable", appendable) + .add("crc64ecma", crc64ecma) + .toString(); + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org