This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new e14fb8e092 HDDS-9526. Two S3G instances writing the same key may cause 
data loss in case of an exception. (#5524)
e14fb8e092 is described below

commit e14fb8e092cbbcd562877864aff1bc66af9fe59b
Author: XiChen <[email protected]>
AuthorDate: Thu Nov 9 22:09:48 2023 +0800

    HDDS-9526. Two S3G instances writing the same key may cause data loss in 
case of an exception. (#5524)
---
 .../client/io/BlockDataStreamOutputEntryPool.java  |   4 +
 .../client/io/BlockOutputStreamEntryPool.java      |   3 +
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |  15 ++
 .../ozone/client/io/KeyDataStreamOutput.java       |  27 +++-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  31 +++-
 .../ozone/client/io/OzoneDataStreamOutput.java     |  21 ++-
 .../hadoop/ozone/client/io/OzoneOutputStream.java  |  23 ++-
 .../ozone/client/protocol/ClientProtocol.java      |   3 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  27 +++-
 .../hadoop/ozone/client/TestOzoneClient.java       |  34 +++++
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |   7 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 166 ++++++++++++---------
 .../ozone/s3/endpoint/ObjectEndpointStreaming.java |  30 ++--
 .../hadoop/ozone/client/ClientProtocolStub.java    |   5 +
 .../hadoop/ozone/client/OzoneBucketStub.java       |   2 +-
 .../hadoop/ozone/client/OzoneOutputStreamStub.java |  15 ++
 .../s3/endpoint/TestMultipartUploadWithCopy.java   |  23 +++
 .../hadoop/ozone/s3/endpoint/TestObjectPut.java    |  44 ++++++
 .../hadoop/ozone/s3/endpoint/TestPartUpload.java   |  73 ++++++++-
 19 files changed, 442 insertions(+), 111 deletions(-)

diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
index 10b16f800d..d4bccd5535 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockDataStreamOutputEntryPool.java
@@ -289,6 +289,10 @@ public class BlockDataStreamOutputEntryPool implements 
KeyMetadataAware {
     return totalDataLen;
   }
 
+  public long getDataSize() {
+    return keyArgs.getDataSize();
+  }
+
   @Override
   public Map<String, String> getMetadata() {
     return this.keyArgs.getMetadata();
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index 7b0259e379..65c1cd4caa 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -438,4 +438,7 @@ public class BlockOutputStreamEntryPool implements 
KeyMetadataAware {
     return null;
   }
 
+  long getDataSize() {
+    return keyArgs.getDataSize();
+  }
 }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
index 242b2606f8..15ebccda28 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECKeyOutputStream.java
@@ -78,6 +78,14 @@ public final class ECKeyOutputStream extends KeyOutputStream
   private final Future<Boolean> flushFuture;
   private final AtomicLong flushCheckpoint;
 
+  /**
+   * Indicates if an atomic write is required. When set to true,
+   * the amount of data written must match the declared size during the commit.
+   * A mismatch will prevent the commit from succeeding.
+   * This is essential for operations like S3 put to ensure atomicity.
+   */
+  private boolean atomicKeyCreation;
+
   private enum StripeWriteStatus {
     SUCCESS,
     FAILED
@@ -155,6 +163,7 @@ public final class ECKeyOutputStream extends KeyOutputStream
     flushExecutor.submit(() -> s3CredentialsProvider.set(s3Auth));
     this.flushFuture = this.flushExecutor.submit(this::flushStripeFromQueue);
     this.flushCheckpoint = new AtomicLong(0);
+    this.atomicKeyCreation = builder.getAtomicKeyCreation();
   }
 
   /**
@@ -512,6 +521,12 @@ public final class ECKeyOutputStream extends 
KeyOutputStream
         Preconditions.checkArgument(writeOffset == offset,
             "Expected writeOffset= " + writeOffset
                 + " Expected offset=" + offset);
+        if (atomicKeyCreation) {
+          long expectedSize = blockOutputStreamEntryPool.getDataSize();
+          Preconditions.checkState(expectedSize == offset, String.format(
+              "Expected: %d and actual %d write sizes do not match",
+                  expectedSize, offset));
+        }
         blockOutputStreamEntryPool.commitKey(offset);
       }
     } catch (ExecutionException e) {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index a6331151e3..2368cd78e9 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -81,6 +81,14 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
 
   private long clientID;
 
+  /**
+   * Indicates if an atomic write is required. When set to true,
+   * the amount of data written must match the declared size during the commit.
+   * A mismatch will prevent the commit from succeeding.
+   * This is essential for operations like S3 put to ensure atomicity.
+   */
+  private boolean atomicKeyCreation;
+
   @VisibleForTesting
   public List<BlockDataStreamOutputEntry> getStreamEntries() {
     return blockDataStreamOutputEntryPool.getStreamEntries();
@@ -109,7 +117,8 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
       OzoneManagerProtocol omClient, int chunkSize,
       String requestId, ReplicationConfig replicationConfig,
       String uploadID, int partNumber, boolean isMultipart,
-      boolean unsafeByteBufferConversion
+      boolean unsafeByteBufferConversion,
+      boolean atomicKeyCreation
   ) {
     super(HddsClientUtils.getRetryPolicyByException(
         config.getMaxRetryCount(), config.getRetryInterval()));
@@ -130,6 +139,7 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
     // encrypted bucket.
     this.writeOffset = 0;
     this.clientID = handler.getId();
+    this.atomicKeyCreation = atomicKeyCreation;
   }
 
   /**
@@ -387,6 +397,12 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
       if (!isException()) {
         Preconditions.checkArgument(writeOffset == offset);
       }
+      if (atomicKeyCreation) {
+        long expectedSize = blockDataStreamOutputEntryPool.getDataSize();
+        Preconditions.checkArgument(expectedSize == offset,
+            String.format("Expected: %d and actual %d write sizes do not 
match",
+                expectedSize, offset));
+      }
       blockDataStreamOutputEntryPool.commitKey(offset);
     } finally {
       blockDataStreamOutputEntryPool.cleanup();
@@ -422,6 +438,7 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
     private boolean unsafeByteBufferConversion;
     private OzoneClientConfig clientConfig;
     private ReplicationConfig replicationConfig;
+    private boolean atomicKeyCreation = false;
 
     public Builder setMultipartUploadID(String uploadID) {
       this.multipartUploadID = uploadID;
@@ -474,6 +491,11 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
       return this;
     }
 
+    public Builder setAtomicKeyCreation(boolean atomicKey) {
+      this.atomicKeyCreation = atomicKey;
+      return this;
+    }
+
     public KeyDataStreamOutput build() {
       return new KeyDataStreamOutput(
           clientConfig,
@@ -486,7 +508,8 @@ public class KeyDataStreamOutput extends 
AbstractDataStreamOutput
           multipartUploadID,
           multipartNumber,
           isMultipartKey,
-          unsafeByteBufferConversion);
+          unsafeByteBufferConversion,
+          atomicKeyCreation);
     }
 
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index fa23f88544..4e0c4c91fa 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -95,6 +95,14 @@ public class KeyOutputStream extends OutputStream
 
   private long clientID;
 
+  /**
+   * Indicates if an atomic write is required. When set to true,
+   * the amount of data written must match the declared size during the commit.
+   * A mismatch will prevent the commit from succeeding.
+   * This is essential for operations like S3 put to ensure atomicity.
+   */
+  private boolean atomicKeyCreation;
+
   public KeyOutputStream(ReplicationConfig replicationConfig,
       ContainerClientMetrics clientMetrics) {
     this.replication = replicationConfig;
@@ -142,7 +150,8 @@ public class KeyOutputStream extends OutputStream
       String requestId, ReplicationConfig replicationConfig,
       String uploadID, int partNumber, boolean isMultipart,
       boolean unsafeByteBufferConversion,
-      ContainerClientMetrics clientMetrics
+      ContainerClientMetrics clientMetrics,
+      boolean atomicKeyCreation
   ) {
     this.config = config;
     this.replication = replicationConfig;
@@ -163,6 +172,7 @@ public class KeyOutputStream extends OutputStream
     this.isException = false;
     this.writeOffset = 0;
     this.clientID = handler.getId();
+    this.atomicKeyCreation = atomicKeyCreation;
   }
 
   /**
@@ -555,6 +565,12 @@ public class KeyOutputStream extends OutputStream
       if (!isException) {
         Preconditions.checkArgument(writeOffset == offset);
       }
+      if (atomicKeyCreation) {
+        long expectedSize = blockOutputStreamEntryPool.getDataSize();
+        Preconditions.checkState(expectedSize == offset,
+            String.format("Expected: %d and actual %d write sizes do not 
match",
+                expectedSize, offset));
+      }
       blockOutputStreamEntryPool.commitKey(offset);
     } finally {
       blockOutputStreamEntryPool.cleanup();
@@ -591,6 +607,7 @@ public class KeyOutputStream extends OutputStream
     private OzoneClientConfig clientConfig;
     private ReplicationConfig replicationConfig;
     private ContainerClientMetrics clientMetrics;
+    private boolean atomicKeyCreation = false;
 
     public String getMultipartUploadID() {
       return multipartUploadID;
@@ -677,6 +694,11 @@ public class KeyOutputStream extends OutputStream
       return this;
     }
 
+    public Builder setAtomicKeyCreation(boolean atomicKey) {
+      this.atomicKeyCreation = atomicKey;
+      return this;
+    }
+
     public Builder setClientMetrics(ContainerClientMetrics clientMetrics) {
       this.clientMetrics = clientMetrics;
       return this;
@@ -686,6 +708,10 @@ public class KeyOutputStream extends OutputStream
       return clientMetrics;
     }
 
+    public boolean getAtomicKeyCreation() {
+      return atomicKeyCreation;
+    }
+
     public KeyOutputStream build() {
       return new KeyOutputStream(
           clientConfig,
@@ -698,7 +724,8 @@ public class KeyOutputStream extends OutputStream
           multipartNumber,
           isMultipartKey,
           unsafeByteBufferConversion,
-          clientMetrics);
+          clientMetrics,
+          atomicKeyCreation);
     }
 
   }
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
index d8cb06eccc..c0af1c5301 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneDataStreamOutput.java
@@ -59,30 +59,35 @@ public class OzoneDataStreamOutput extends 
ByteBufferOutputStream
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    KeyDataStreamOutput keyDataStreamOutput = getKeyDataStreamOutput();
+    if (keyDataStreamOutput != null) {
+      return keyDataStreamOutput.getCommitUploadPartInfo();
+    }
+    // Otherwise return null.
+    return null;
+  }
+
+  public KeyDataStreamOutput getKeyDataStreamOutput() {
     if (byteBufferStreamOutput instanceof OzoneOutputStream) {
       OutputStream outputStream =
           ((OzoneOutputStream) byteBufferStreamOutput).getOutputStream();
       if (outputStream instanceof KeyDataStreamOutput) {
-        return ((KeyDataStreamOutput)
-            outputStream).getCommitUploadPartInfo();
+        return ((KeyDataStreamOutput) outputStream);
       } else if (outputStream instanceof CryptoOutputStream) {
         OutputStream wrappedStream =
             ((CryptoOutputStream) outputStream).getWrappedStream();
         if (wrappedStream instanceof KeyDataStreamOutput) {
-          return ((KeyDataStreamOutput) wrappedStream)
-              .getCommitUploadPartInfo();
+          return ((KeyDataStreamOutput) wrappedStream);
         }
       } else if (outputStream instanceof CipherOutputStreamOzone) {
         OutputStream wrappedStream =
             ((CipherOutputStreamOzone) outputStream).getWrappedStream();
         if (wrappedStream instanceof KeyDataStreamOutput) {
-          return ((KeyDataStreamOutput) wrappedStream)
-              .getCommitUploadPartInfo();
+          return ((KeyDataStreamOutput) wrappedStream);
         }
       }
     } else if (byteBufferStreamOutput instanceof KeyDataStreamOutput) {
-      return ((KeyDataStreamOutput)
-          byteBufferStreamOutput).getCommitUploadPartInfo();
+      return ((KeyDataStreamOutput) byteBufferStreamOutput);
     }
     // Otherwise return null.
     return null;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
index 093b31b7a5..bd056185e7 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/OzoneOutputStream.java
@@ -123,29 +123,38 @@ public class OzoneOutputStream extends 
ByteArrayStreamOutput
   }
 
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
+    KeyOutputStream keyOutputStream = getKeyOutputStream();
+    if (keyOutputStream != null) {
+      return keyOutputStream.getCommitUploadPartInfo();
+    }
+    // Otherwise return null.
+    return null;
+  }
+
+  public OutputStream getOutputStream() {
+    return outputStream;
+  }
+
+  public KeyOutputStream getKeyOutputStream() {
     if (outputStream instanceof KeyOutputStream) {
-      return ((KeyOutputStream) outputStream).getCommitUploadPartInfo();
+      return ((KeyOutputStream) outputStream);
     } else  if (outputStream instanceof CryptoOutputStream) {
       OutputStream wrappedStream =
           ((CryptoOutputStream) outputStream).getWrappedStream();
       if (wrappedStream instanceof KeyOutputStream) {
-        return ((KeyOutputStream) wrappedStream).getCommitUploadPartInfo();
+        return ((KeyOutputStream) wrappedStream);
       }
     } else if (outputStream instanceof CipherOutputStreamOzone) {
       OutputStream wrappedStream =
           ((CipherOutputStreamOzone) outputStream).getWrappedStream();
       if (wrappedStream instanceof KeyOutputStream) {
-        return ((KeyOutputStream)wrappedStream).getCommitUploadPartInfo();
+        return ((KeyOutputStream)wrappedStream);
       }
     }
     // Otherwise return null.
     return null;
   }
 
-  public OutputStream getOutputStream() {
-    return outputStream;
-  }
-
   @Override
   public Map<String, String> getMetadata() {
     if (outputStream instanceof CryptoOutputStream) {
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index b45a3209f4..bc01d6653b 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -1008,6 +1008,9 @@ public interface ClientProtocol {
    */
   void setThreadLocalS3Auth(S3Auth s3Auth);
 
+
+  void setIsS3Request(boolean isS3Request);
+
   /**
    * Gets the S3 Authentication information that is attached to the thread.
    * @return S3 Authentication information.
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index ad8ced95d1..5d70bdfb86 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -160,6 +160,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.SynchronousQueue;
@@ -212,6 +213,7 @@ public class RpcClient implements ClientProtocol {
   private final OzoneManagerVersion omVersion;
   private volatile ExecutorService ecReconstructExecutor;
   private final ContainerClientMetrics clientMetrics;
+  private final AtomicBoolean isS3GRequest = new AtomicBoolean(false);
 
   /**
    * Creates RpcClient instance with the given configuration.
@@ -687,7 +689,7 @@ public class RpcClient implements ClientProtocol {
         : "with server-side default bucket layout";
     LOG.info("Creating Bucket: {}/{}, {}, {} as owner, Versioning {}, " +
             "Storage Type set to {} and Encryption set to {}, " +
-            "Replication Type set to {}, Namespace Quota set to {}, " + 
+            "Replication Type set to {}, Namespace Quota set to {}, " +
             "Space Quota set to {} ",
         volumeName, bucketName, layoutMsg, owner, isVersionEnabled,
         storageType, bek != null, replicationType,
@@ -1346,6 +1348,13 @@ public class RpcClient implements ClientProtocol {
         .setLatestVersionLocation(getLatestVersionLocation);
 
     OpenKeySession openKey = ozoneManagerClient.openKey(builder.build());
+    // For bucket with layout OBJECT_STORE, when create an empty file (size=0),
+    // OM will set DataSize to OzoneConfigKeys#OZONE_SCM_BLOCK_SIZE,
+    // which will cause S3G's atomic write length check to fail,
+    // so reset size to 0 here.
+    if (isS3GRequest.get() && size == 0) {
+      openKey.getKeyInfo().setDataSize(size);
+    }
     return createOutputStream(openKey);
   }
 
@@ -1787,6 +1796,7 @@ public class RpcClient implements ClientProtocol {
         .setMultipartNumber(partNumber)
         .setMultipartUploadID(uploadID)
         .setIsMultipartKey(true)
+        .setAtomicKeyCreation(isS3GRequest.get())
         .build();
     return createOutputStream(openKey, keyOutputStream);
   }
@@ -1802,7 +1812,9 @@ public class RpcClient implements ClientProtocol {
       throws IOException {
     final OpenKeySession openKey = newMultipartOpenKey(
         volumeName, bucketName, keyName, size, partNumber, uploadID);
-
+    // Amazon S3 never adds partial objects, So for S3 requests we need to
+    // set atomicKeyCreation to true
+    // refer: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
     KeyDataStreamOutput keyOutputStream =
         new KeyDataStreamOutput.Builder()
             .setHandler(openKey)
@@ -1814,6 +1826,7 @@ public class RpcClient implements ClientProtocol {
             .setIsMultipartKey(true)
             .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
             .setConfig(conf.getObject(OzoneClientConfig.class))
+            .setAtomicKeyCreation(isS3GRequest.get())
             .build();
     keyOutputStream
         .addPreallocateBlocks(
@@ -2216,6 +2229,9 @@ public class RpcClient implements ClientProtocol {
       throws IOException {
     final ReplicationConfig replicationConfig
         = openKey.getKeyInfo().getReplicationConfig();
+    // Amazon S3 never adds partial objects, So for S3 requests we need to
+    // set atomicKeyCreation to true
+    // refer: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html
     KeyDataStreamOutput keyOutputStream =
         new KeyDataStreamOutput.Builder()
             .setHandler(openKey)
@@ -2224,6 +2240,7 @@ public class RpcClient implements ClientProtocol {
             .setReplicationConfig(replicationConfig)
             .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
             .setConfig(conf.getObject(OzoneClientConfig.class))
+            .setAtomicKeyCreation(isS3GRequest.get())
             .build();
     keyOutputStream
         .addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
@@ -2305,6 +2322,7 @@ public class RpcClient implements ClientProtocol {
         .setOmClient(ozoneManagerClient)
         .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
         .setConfig(conf.getObject(OzoneClientConfig.class))
+        .setAtomicKeyCreation(isS3GRequest.get())
         .setClientMetrics(clientMetrics);
   }
 
@@ -2383,6 +2401,11 @@ public class RpcClient implements ClientProtocol {
     ozoneManagerClient.setThreadLocalS3Auth(ozoneSharedSecretAuth);
   }
 
+  @Override
+  public void setIsS3Request(boolean s3Request) {
+    this.isS3GRequest.set(s3Request);
+  }
+
   @Override
   public S3Auth getThreadLocalS3Auth() {
     return ozoneManagerClient.getThreadLocalS3Auth();
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
index 542b8a8a9e..c7a09cd2ce 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClient.java
@@ -224,6 +224,40 @@ public class TestOzoneClient {
     }
   }
 
+  /**
+   * This test validates that for S3G,
+   * the key upload process needs to be atomic.
+   * It simulates two mismatch scenarios where the actual write data size does
+   * not match the expected size.
+   */
+  @Test
+  public void testPutKeySizeMismatch() throws IOException {
+    String value = new String(new byte[1024], UTF_8);
+    OzoneBucket bucket = getOzoneBucket();
+    String keyName = UUID.randomUUID().toString();
+    try {
+      // Simulating first mismatch: Write less data than expected
+      client.getProxy().setIsS3Request(true);
+      OzoneOutputStream out1 = bucket.createKey(keyName,
+          value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
+          new HashMap<>());
+      out1.write(value.substring(0, value.length() - 1).getBytes(UTF_8));
+      Assertions.assertThrows(IllegalStateException.class, out1::close,
+          "Expected IllegalArgumentException due to size mismatch.");
+
+      // Simulating second mismatch: Write more data than expected
+      OzoneOutputStream out2 = bucket.createKey(keyName,
+          value.getBytes(UTF_8).length, ReplicationType.RATIS, ONE,
+          new HashMap<>());
+      value += "1";
+      out2.write(value.getBytes(UTF_8));
+      Assertions.assertThrows(IllegalStateException.class, out2::close,
+          "Expected IllegalArgumentException due to size mismatch.");
+    } finally {
+      client.getProxy().setIsS3Request(false);
+    }
+  }
+
   private OzoneBucket getOzoneBucket() throws IOException {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
index 9473467b8b..05b7a62c06 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/EndpointBase.java
@@ -127,9 +127,10 @@ public abstract class EndpointBase implements Auditor {
         signatureInfo.getSignature(),
         signatureInfo.getAwsAccessId(), signatureInfo.getAwsAccessId());
     LOG.debug("S3 access id: {}", s3Auth.getAccessID());
-    getClient().getObjectStore()
-        .getClientProxy()
-        .setThreadLocalS3Auth(s3Auth);
+    ClientProtocol clientProtocol =
+        getClient().getObjectStore().getClientProxy();
+    clientProtocol.setThreadLocalS3Auth(s3Auth);
+    clientProtocol.setIsS3Request(true);
     init();
   }
 
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 9503c53cfd..d85a628ea3 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -123,6 +124,7 @@ import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.PRECOND_FAILED;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.newError;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.ACCEPT_RANGE_HEADER;
+import static 
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.CONTENT_RANGE_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
 import static 
org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER_RANGE;
@@ -205,6 +207,7 @@ public class ObjectEndpoint extends EndpointBase {
    * See: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPUT.html 
for
    * more details.
    */
+  @SuppressWarnings("checkstyle:MethodLength")
   @PUT
   public Response put(
       @PathParam("bucket") String bucketName,
@@ -217,9 +220,6 @@ public class ObjectEndpoint extends EndpointBase {
     S3GAction s3GAction = S3GAction.CREATE_KEY;
 
     boolean auditSuccess = true;
-
-    OzoneOutputStream output = null;
-
     String copyHeader = null, storageType = null;
     try {
       OzoneVolume volume = getVolume();
@@ -289,6 +289,7 @@ public class ObjectEndpoint extends EndpointBase {
           .equals(headers.getHeaderString("x-amz-content-sha256"))) {
         body = new DigestInputStream(new SignedChunksInputStream(body),
             E_TAG_PROVIDER.get());
+        length = Long.parseLong(amzDecodedLength);
       } else {
         body = new DigestInputStream(body, E_TAG_PROVIDER.get());
       }
@@ -303,14 +304,16 @@ public class ObjectEndpoint extends EndpointBase {
         eTag = keyWriteResult.getKey();
         putLength = keyWriteResult.getValue();
       } else {
-        output = getClientProtocol().createKey(volume.getName(), bucketName,
-            keyPath, length, replicationConfig, customMetadata);
-        getMetrics().updatePutKeyMetadataStats(startNanos);
-        putLength = IOUtils.copyLarge(body, output);
-        eTag = DatatypeConverter.printHexBinary(
-            ((DigestInputStream) body).getMessageDigest().digest())
-            .toLowerCase();
-        output.getMetadata().put(ETAG, eTag);
+        try (OzoneOutputStream output = getClientProtocol().createKey(
+            volume.getName(), bucketName, keyPath, length, replicationConfig,
+            customMetadata)) {
+          getMetrics().updatePutKeyMetadataStats(startNanos);
+          putLength = IOUtils.copyLarge(body, output);
+          eTag = DatatypeConverter.printHexBinary(
+                  ((DigestInputStream) body).getMessageDigest().digest())
+              .toLowerCase();
+          output.getMetadata().put(ETAG, eTag);
+        }
       }
 
       getMetrics().incPutKeySuccessLength(putLength);
@@ -352,9 +355,6 @@ public class ObjectEndpoint extends EndpointBase {
       }
       throw ex;
     } finally {
-      if (output != null) {
-        output.close();
-      }
       if (auditSuccess) {
         AUDIT.logWriteSuccess(
             buildAuditMessageForSuccess(s3GAction, getAuditParameters()));
@@ -845,6 +845,7 @@ public class ObjectEndpoint extends EndpointBase {
     }
   }
 
+  @SuppressWarnings("checkstyle:MethodLength")
   private Response createMultipartKey(OzoneVolume volume, String bucket,
                                       String key, long length, int partNumber,
                                       String uploadID, InputStream body)
@@ -852,12 +853,13 @@ public class ObjectEndpoint extends EndpointBase {
     long startNanos = Time.monotonicNowNanos();
     String copyHeader = null;
     try {
-      OzoneOutputStream ozoneOutputStream = null;
 
       if ("STREAMING-AWS4-HMAC-SHA256-PAYLOAD"
           .equals(headers.getHeaderString("x-amz-content-sha256"))) {
         body = new DigestInputStream(new SignedChunksInputStream(body),
             E_TAG_PROVIDER.get());
+        length = Long.parseLong(
+            headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER));
       } else {
         body = new DigestInputStream(body, E_TAG_PROVIDER.get());
       }
@@ -875,78 +877,96 @@ public class ObjectEndpoint extends EndpointBase {
         enableEC = true;
       }
 
-      try {
-        if (datastreamEnabled && !enableEC && copyHeader == null) {
-          getMetrics().updatePutKeyMetadataStats(startNanos);
-          return ObjectEndpointStreaming
-              .createMultipartKey(ozoneBucket, key, length, partNumber,
-                  uploadID, chunkSize, (DigestInputStream) body);
+      if (datastreamEnabled && !enableEC && copyHeader == null) {
+        getMetrics().updatePutKeyMetadataStats(startNanos);
+        return ObjectEndpointStreaming
+            .createMultipartKey(ozoneBucket, key, length, partNumber,
+                uploadID, chunkSize, (DigestInputStream) body);
+      }
+      // OmMultipartCommitUploadPartInfo can only be gotten after the
+      // OzoneOutputStream is closed, so we need to save the KeyOutputStream
+      // in the OzoneOutputStream and use it to get the
+      // OmMultipartCommitUploadPartInfo after OzoneOutputStream is closed.
+      KeyOutputStream keyOutputStream = null;
+      if (copyHeader != null) {
+        Pair<String, String> result = parseSourceHeader(copyHeader);
+        String sourceBucket = result.getLeft();
+        String sourceKey = result.getRight();
+
+        OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
+            volume.getName(), sourceBucket, sourceKey);
+        String range =
+            headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
+        RangeHeader rangeHeader = null;
+        if (range != null) {
+          rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0);
+          // When copy Range, the size of the target key is the
+          // length specified by COPY_SOURCE_HEADER_RANGE.
+          length = rangeHeader.getEndOffset() -
+              rangeHeader.getStartOffset() + 1;
+        } else {
+          length = sourceKeyDetails.getDataSize();
+        }
+        Long sourceKeyModificationTime = sourceKeyDetails
+            .getModificationTime().toEpochMilli();
+        String copySourceIfModifiedSince =
+            headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE);
+        String copySourceIfUnmodifiedSince =
+            headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE);
+        if (!checkCopySourceModificationTime(sourceKeyModificationTime,
+            copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) {
+          throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey);
         }
-        ozoneOutputStream = getClientProtocol().createMultipartKey(
-            volume.getName(), bucket, key, length, partNumber, uploadID);
-
-        if (copyHeader != null) {
-          Pair<String, String> result = parseSourceHeader(copyHeader);
-
-          String sourceBucket = result.getLeft();
-          String sourceKey = result.getRight();
-
-          OzoneKeyDetails sourceKeyDetails = getClientProtocol().getKeyDetails(
-              volume.getName(), sourceBucket, sourceKey);
-          Long sourceKeyModificationTime = sourceKeyDetails
-              .getModificationTime().toEpochMilli();
-          String copySourceIfModifiedSince =
-              headers.getHeaderString(COPY_SOURCE_IF_MODIFIED_SINCE);
-          String copySourceIfUnmodifiedSince =
-              headers.getHeaderString(COPY_SOURCE_IF_UNMODIFIED_SINCE);
-          if (!checkCopySourceModificationTime(sourceKeyModificationTime,
-              copySourceIfModifiedSince, copySourceIfUnmodifiedSince)) {
-            throw newError(PRECOND_FAILED, sourceBucket + "/" + sourceKey);
-          }
 
-          try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) {
-
-            String range =
-                headers.getHeaderString(COPY_SOURCE_HEADER_RANGE);
-            long copyLength;
-            if (range != null) {
-              RangeHeader rangeHeader =
-                  RangeHeaderParserUtil.parseRangeHeader(range, 0);
-              final long skipped =
-                  sourceObject.skip(rangeHeader.getStartOffset());
-              if (skipped != rangeHeader.getStartOffset()) {
-                throw new EOFException(
-                    "Bytes to skip: "
-                        + rangeHeader.getStartOffset() + " actual: " + 
skipped);
-              }
+        try (OzoneInputStream sourceObject = sourceKeyDetails.getContent()) {
+          long copyLength;
+          if (range != null) {
+            final long skipped =
+                sourceObject.skip(rangeHeader.getStartOffset());
+            if (skipped != rangeHeader.getStartOffset()) {
+              throw new EOFException(
+                  "Bytes to skip: "
+                      + rangeHeader.getStartOffset() + " actual: " + skipped);
+            }
+            try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
+                .createMultipartKey(volume.getName(), bucket, key, length,
+                    partNumber, uploadID)) {
               getMetrics().updateCopyKeyMetadataStats(startNanos);
-              copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream, 
0,
-                  rangeHeader.getEndOffset() - rangeHeader.getStartOffset()
-                      + 1);
-            } else {
+              copyLength = IOUtils.copyLarge(
+                  sourceObject, ozoneOutputStream, 0, length);
+              keyOutputStream = ozoneOutputStream.getKeyOutputStream();
+            }
+          } else {
+            try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
+                .createMultipartKey(volume.getName(), bucket, key, length,
+                    partNumber, uploadID)) {
               getMetrics().updateCopyKeyMetadataStats(startNanos);
               copyLength = IOUtils.copyLarge(sourceObject, ozoneOutputStream);
+              keyOutputStream = ozoneOutputStream.getKeyOutputStream();
             }
-            getMetrics().incCopyObjectSuccessLength(copyLength);
           }
-        } else {
+          getMetrics().incCopyObjectSuccessLength(copyLength);
+        }
+      } else {
+        long putLength;
+        try (OzoneOutputStream ozoneOutputStream = getClientProtocol()
+            .createMultipartKey(volume.getName(), bucket, key, length,
+                partNumber, uploadID)) {
           getMetrics().updatePutKeyMetadataStats(startNanos);
-          long putLength = IOUtils.copyLarge(body, ozoneOutputStream);
+          putLength = IOUtils.copyLarge(body, ozoneOutputStream);
           ((KeyMetadataAware)ozoneOutputStream.getOutputStream())
-              .getMetadata().put("ETag", DatatypeConverter.printHexBinary(
-                  ((DigestInputStream) body).getMessageDigest().digest())
+              .getMetadata().put(ETAG, DatatypeConverter.printHexBinary(
+                      ((DigestInputStream) body).getMessageDigest().digest())
                   .toLowerCase());
-          getMetrics().incPutKeySuccessLength(putLength);
-        }
-      } finally {
-        if (ozoneOutputStream != null) {
-          ozoneOutputStream.close();
+          keyOutputStream
+              = ozoneOutputStream.getKeyOutputStream();
         }
+        getMetrics().incPutKeySuccessLength(putLength);
       }
 
-      assert ozoneOutputStream != null;
+      assert keyOutputStream != null;
       OmMultipartCommitUploadPartInfo omMultipartCommitUploadPartInfo =
-          ozoneOutputStream.getCommitUploadPartInfo();
+          keyOutputStream.getCommitUploadPartInfo();
       String eTag = omMultipartCommitUploadPartInfo.getPartName();
 
       if (copyHeader != null) {
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
index ef87ad450d..b536b3248b 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpointStreaming.java
@@ -22,6 +22,7 @@ import javax.xml.bind.DatatypeConverter;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.io.KeyDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.KeyMetadataAware;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
@@ -145,18 +146,24 @@ final class ObjectEndpointStreaming {
                                             String uploadID, int chunkSize,
                                             DigestInputStream body)
       throws IOException, OS3Exception {
-    OzoneDataStreamOutput streamOutput = null;
     String eTag;
     S3GatewayMetrics metrics = S3GatewayMetrics.create();
+    // OmMultipartCommitUploadPartInfo can only be gotten after the
+    // OzoneDataStreamOutput is closed, so we need to save the
+    // KeyDataStreamOutput in the OzoneDataStreamOutput and use it to get the
+    // OmMultipartCommitUploadPartInfo after OzoneDataStreamOutput is closed.
+    KeyDataStreamOutput keyDataStreamOutput = null;
     try {
-      streamOutput = ozoneBucket
-          .createMultipartStreamKey(key, length, partNumber, uploadID);
-      long putLength =
-          writeToStreamOutput(streamOutput, body, chunkSize, length);
-      eTag = DatatypeConverter.printHexBinary(body.getMessageDigest().digest())
-          .toLowerCase();
-      ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag);
-      metrics.incPutKeySuccessLength(putLength);
+      try (OzoneDataStreamOutput streamOutput = ozoneBucket
+          .createMultipartStreamKey(key, length, partNumber, uploadID)) {
+        long putLength =
+            writeToStreamOutput(streamOutput, body, chunkSize, length);
+        eTag = DatatypeConverter.printHexBinary(
+            body.getMessageDigest().digest()).toLowerCase();
+        ((KeyMetadataAware)streamOutput).getMetadata().put("ETag", eTag);
+        metrics.incPutKeySuccessLength(putLength);
+        keyDataStreamOutput = streamOutput.getKeyDataStreamOutput();
+      }
     } catch (OMException ex) {
       if (ex.getResult() ==
           OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR) {
@@ -168,10 +175,9 @@ final class ObjectEndpointStreaming {
       }
       throw ex;
     } finally {
-      if (streamOutput != null) {
-        streamOutput.close();
+      if (keyDataStreamOutput != null) {
         OmMultipartCommitUploadPartInfo commitUploadPartInfo =
-            streamOutput.getCommitUploadPartInfo();
+            keyDataStreamOutput.getCommitUploadPartInfo();
         eTag = commitUploadPartInfo.getPartName();
       }
     }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index 5505688b26..71f4784350 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -577,6 +577,11 @@ public class ClientProtocolStub implements ClientProtocol {
 
   }
 
+  @Override
+  public void setIsS3Request(boolean isS3Request) {
+
+  }
+
   @Override
   public S3Auth getThreadLocalS3Auth() {
     return null;
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index d546afe2c7..fad3386c61 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@ -512,7 +512,7 @@ public class OzoneBucketStub extends OzoneBucket {
         if (partEntry.getKey() > partNumberMarker) {
           PartInfo partInfo = new PartInfo(partEntry.getKey(),
               partEntry.getValue().getPartName(),
-              partEntry.getValue().getContent().length, Time.now());
+              Time.now(), partEntry.getValue().getContent().length);
           partInfoList.add(partInfo);
           count++;
         }
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
index 326d388b88..00a7ba5574 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneOutputStreamStub.java
@@ -20,6 +20,9 @@
 
 package org.apache.hadoop.ozone.client;
 
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 
@@ -69,6 +72,18 @@ public class OzoneOutputStreamStub extends OzoneOutputStream 
{
     }
   }
 
+  @Override
+  public KeyOutputStream getKeyOutputStream() {
+    return new KeyOutputStream(
+        ReplicationConfig.getDefault(new OzoneConfiguration()), null) {
+      @Override
+      public synchronized OmMultipartCommitUploadPartInfo
+          getCommitUploadPartInfo() {
+        return OzoneOutputStreamStub.this.getCommitUploadPartInfo();
+      }
+    };
+  }
+
   @Override
   public OmMultipartCommitUploadPartInfo getCommitUploadPartInfo() {
     return closed ? new OmMultipartCommitUploadPartInfo(partName) : null;
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
index 7422806db7..7186ceb557 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestMultipartUploadWithCopy.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.s3.endpoint.CompleteMultipartUploadRequest.Part;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 
@@ -385,6 +386,28 @@ public class TestMultipartUploadWithCopy {
     return part;
   }
 
+  @Test
+  public void testUploadWithRangeCopyContentLength()
+      throws IOException, OS3Exception {
+    // The contentLength specified when creating the Key should be the same as
+    // the Content-Length, the key Commit will compare the Content-Length with
+    // the actual length of the data written.
+
+    String uploadID = initiateMultipartUpload(KEY);
+    ByteArrayInputStream body = new ByteArrayInputStream("".getBytes(UTF_8));
+    Map<String, String> additionalHeaders = new HashMap<>();
+    additionalHeaders.put(COPY_SOURCE_HEADER,
+        OzoneConsts.S3_BUCKET + "/" + EXISTING_KEY);
+    additionalHeaders.put(COPY_SOURCE_HEADER_RANGE, "bytes=0-3");
+    setHeaders(additionalHeaders);
+    REST.put(OzoneConsts.S3_BUCKET, KEY, 0, 1, uploadID, body);
+    OzoneMultipartUploadPartListParts parts =
+        CLIENT.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET)
+        .listParts(KEY, uploadID, 0, 100);
+    Assert.assertEquals(1, parts.getPartInfoList().size());
+    Assert.assertEquals(4, parts.getPartInfoList().get(0).getSize());
+  }
+
   private void completeMultipartUpload(String key,
       CompleteMultipartUploadRequest completeMultipartUploadRequest,
       String uploadID) throws IOException, OS3Exception {
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
index 55c661084b..4c1fd9a7aa 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestObjectPut.java
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.Assertions;
 import org.mockito.Mockito;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.COPY_SOURCE_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Utils.urlEncode;
@@ -140,6 +141,47 @@ public class TestObjectPut {
     Assert.assertEquals(CONTENT, keyContent);
   }
 
+  @Test
+  public void testPutObjectContentLength() throws IOException, OS3Exception {
+    // The contentLength specified when creating the Key should be the same as
+    // the Content-Length, the key Commit will compare the Content-Length with
+    // the actual length of the data written.
+    HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(CONTENT.getBytes(UTF_8));
+    objectEndpoint.setHeaders(headers);
+    long dataSize = CONTENT.length();
+
+    objectEndpoint.put(bucketName, keyName, dataSize, 0, null, body);
+    Assert.assertEquals(dataSize, getKeyDataSize(keyName));
+  }
+
+  @Test
+  public void testPutObjectContentLengthForStreaming()
+      throws IOException, OS3Exception {
+    HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+    objectEndpoint.setHeaders(headers);
+
+    String chunkedContent = "0a;chunk-signature=signature\r\n"
+        + "1234567890\r\n"
+        + "05;chunk-signature=signature\r\n"
+        + "abcde\r\n";
+
+    when(headers.getHeaderString("x-amz-content-sha256"))
+        .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+
+    when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER))
+        .thenReturn("15");
+    objectEndpoint.put(bucketName, keyName, chunkedContent.length(), 0, null,
+        new ByteArrayInputStream(chunkedContent.getBytes(UTF_8)));
+    Assert.assertEquals(15, getKeyDataSize(keyName));
+  }
+
+  private long getKeyDataSize(String key) throws IOException {
+    return clientStub.getObjectStore().getS3Bucket(bucketName)
+        .getKey(key).getDataSize();
+  }
+
   @Test
   public void testPutObjectWithSignedChunks() throws IOException, OS3Exception 
{
     //GIVEN
@@ -153,6 +195,8 @@ public class TestObjectPut {
 
     when(headers.getHeaderString("x-amz-content-sha256"))
         .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+    when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER))
+        .thenReturn("15");
 
     //WHEN
     Response response = objectEndpoint.put(bucketName, keyName,
diff --git 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
index 1b0e808f2d..6ba1b557ec 100644
--- 
a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
+++ 
b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/endpoint/TestPartUpload.java
@@ -24,7 +24,9 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientStub;
+import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
 import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -33,9 +35,12 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.UUID;
 
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static 
org.apache.hadoop.ozone.s3.util.S3Consts.DECODED_CONTENT_LENGTH_HEADER;
 import static org.apache.hadoop.ozone.s3.util.S3Consts.STORAGE_CLASS_HEADER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -49,11 +54,12 @@ import static org.mockito.Mockito.when;
 public class TestPartUpload {
 
   private static final ObjectEndpoint REST = new ObjectEndpoint();
+  private static OzoneClient client;
 
   @BeforeClass
   public static void setUp() throws Exception {
 
-    OzoneClient client = new OzoneClientStub();
+    client = new OzoneClientStub();
     client.getObjectStore().createS3Bucket(OzoneConsts.S3_BUCKET);
 
 
@@ -135,4 +141,69 @@ public class TestPartUpload {
       assertEquals(HTTP_NOT_FOUND, ex.getHttpCode());
     }
   }
+
+  @Test
+  public void testPartUploadStreamContentLength()
+      throws IOException, OS3Exception {
+    HttpHeaders headers = Mockito.mock(HttpHeaders.class);
+    ObjectEndpoint objectEndpoint = new ObjectEndpoint();
+    objectEndpoint.setHeaders(headers);
+    objectEndpoint.setClient(client);
+    objectEndpoint.setOzoneConfiguration(new OzoneConfiguration());
+    String keyName = UUID.randomUUID().toString();
+
+    String chunkedContent = "0a;chunk-signature=signature\r\n"
+        + "1234567890\r\n"
+        + "05;chunk-signature=signature\r\n"
+        + "abcde\r\n";
+    when(headers.getHeaderString("x-amz-content-sha256"))
+        .thenReturn("STREAMING-AWS4-HMAC-SHA256-PAYLOAD");
+    when(headers.getHeaderString(DECODED_CONTENT_LENGTH_HEADER))
+        .thenReturn("15");
+
+    Response response = objectEndpoint.initializeMultipartUpload(
+        OzoneConsts.S3_BUCKET, keyName);
+    MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+        (MultipartUploadInitiateResponse) response.getEntity();
+    assertNotNull(multipartUploadInitiateResponse.getUploadID());
+    String uploadID = multipartUploadInitiateResponse.getUploadID();
+    long contentLength = chunkedContent.length();
+
+    objectEndpoint.put(OzoneConsts.S3_BUCKET, keyName, contentLength, 1,
+        uploadID, new ByteArrayInputStream(chunkedContent.getBytes(UTF_8)));
+    assertContentLength(uploadID, keyName, 15);
+  }
+
+  @Test
+  public void testPartUploadContentLength() throws IOException, OS3Exception {
+    // The contentLength specified when creating the Key should be the same as
+    // the Content-Length, the key Commit will compare the Content-Length with
+    // the actual length of the data written.
+
+    String keyName = UUID.randomUUID().toString();
+    Response response = REST.initializeMultipartUpload(OzoneConsts.S3_BUCKET,
+        keyName);
+    MultipartUploadInitiateResponse multipartUploadInitiateResponse =
+        (MultipartUploadInitiateResponse) response.getEntity();
+    assertNotNull(multipartUploadInitiateResponse.getUploadID());
+    String uploadID = multipartUploadInitiateResponse.getUploadID();
+    String content = "Multipart Upload";
+    long contentLength = content.length();
+
+    ByteArrayInputStream body =
+        new ByteArrayInputStream(content.getBytes(UTF_8));
+    REST.put(OzoneConsts.S3_BUCKET, keyName,
+        contentLength, 1, uploadID, body);
+    assertContentLength(uploadID, keyName, content.length());
+  }
+
+  private void assertContentLength(String uploadID, String key,
+      long contentLength) throws IOException {
+    OzoneMultipartUploadPartListParts parts =
+        client.getObjectStore().getS3Bucket(OzoneConsts.S3_BUCKET)
+            .listParts(key, uploadID, 0, 100);
+    Assert.assertEquals(1, parts.getPartInfoList().size());
+    Assert.assertEquals(contentLength,
+        parts.getPartInfoList().get(0).getSize());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to