[ 
https://issues.apache.org/jira/browse/HADOOP-19256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932046#comment-17932046
 ] 

ASF GitHub Bot commented on HADOOP-19256:
-----------------------------------------

steveloughran commented on code in PR #7329:
URL: https://github.com/apache/hadoop/pull/7329#discussion_r1977939528


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static 
org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;
+
+    private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 
255);
+    private static final byte[] MULTIPART_FILE_BYTES = 
dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
+
+    private BlockOutputStreamStatistics statistics;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+
+        S3ATestUtils.disableFilesystemCaching(conf);
+        removeBaseAndBucketOverrides(
+                conf,
+                MULTIPART_SIZE,
+                UPLOAD_PART_COUNT_LIMIT,
+                MIN_MULTIPART_THRESHOLD);
+        conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+        conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD);
+        conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD);
+        return conf;
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        Configuration conf = getConfiguration();
+        skipIfNotEnabled(conf, FS_S3A_CREATE_OVERWRITE_SUPPORTED,
+                "Skipping IfNoneMatch tests");
+    }
+
+    private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+        S3Exception s3Exception = (S3Exception) ex.getCause();
+
+        if (s3Exception.statusCode() != code) {
+            throw new AssertionError("Expected status code " + code + " from " 
+ ex, ex);
+        }
+    }
+
+    protected String getBlockOutputBufferName() {
+        return FAST_UPLOAD_BUFFER_ARRAY;
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStream stream = getStreamWithFlags(fs, path, 
ifNoneMatchFlag, etag, forceMultipart);
+        if (data != null && data.length > 0) {
+            stream.write(data);
+        }
+        stream.close();
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false);
+    }
+
+     private static FSDataOutputStream getStreamWithFlags(

Review Comment:
   add javadocs for all these methods...test code needs to be as rigorously 
documented as production code



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;

Review Comment:
   mport layout needs tuning, the desired layout should be
   
   java.*
   javax.*
   
   everything not in org.apache
   org.apache.*, alphabetical order
   
   static import.*
   



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static 
org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;
+
+    private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 
255);
+    private static final byte[] MULTIPART_FILE_BYTES = 
dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
+
+    private BlockOutputStreamStatistics statistics;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();

Review Comment:
   wrong indentation; hadoop uses two spaces per indentation, not four.
   Change your IDE settings then do a whole-file reformat



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static 
org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;
+
+    private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 
255);
+    private static final byte[] MULTIPART_FILE_BYTES = 
dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
+
+    private BlockOutputStreamStatistics statistics;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+
+        S3ATestUtils.disableFilesystemCaching(conf);
+        removeBaseAndBucketOverrides(
+                conf,
+                MULTIPART_SIZE,
+                UPLOAD_PART_COUNT_LIMIT,
+                MIN_MULTIPART_THRESHOLD);
+        conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+        conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD);
+        conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD);
+        return conf;
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        Configuration conf = getConfiguration();
+        skipIfNotEnabled(conf, FS_S3A_CREATE_OVERWRITE_SUPPORTED,
+                "Skipping IfNoneMatch tests");
+    }
+
+    private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+        S3Exception s3Exception = (S3Exception) ex.getCause();
+
+        if (s3Exception.statusCode() != code) {
+            throw new AssertionError("Expected status code " + code + " from " 
+ ex, ex);
+        }
+    }
+
+    protected String getBlockOutputBufferName() {
+        return FAST_UPLOAD_BUFFER_ARRAY;
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStream stream = getStreamWithFlags(fs, path, 
ifNoneMatchFlag, etag, forceMultipart);
+        if (data != null && data.length > 0) {
+            stream.write(data);
+        }
+        stream.close();
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false);
+    }
+
+     private static FSDataOutputStream getStreamWithFlags(
+            FileSystem fs,
+            Path path,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStreamBuilder builder = fs.createFile(path);
+        if (ifNoneMatchFlag) {
+            builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, "true");
+        }
+        if (etag != null) {
+            builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag);
+        }
+        if (forceMultipart) {
+            builder.opt(FS_S3A_CREATE_MULTIPART, "true");
+        }
+        return builder.create().build();
+    }
+
+    private static FSDataOutputStream getStreamWithFlags(
+            FileSystem fs,
+            Path path,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        return getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, false);
+    }
+
+    private static String readFileContent(FileSystem fs, Path path) throws 
Throwable {
+        try (FSDataInputStream inputStream = fs.open(path)) {
+            return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+        }
+    }
+
+    private void updateStatistics(FSDataOutputStream stream) {
+        statistics = S3ATestUtils.getOutputStreamStatistics(stream);
+    }
+
+    @Test
+    public void testIfNoneMatchConflictOnOverwrite() throws Throwable {
+        describe("generate conflict on overwrites");
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create a file over an empty path: all good
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // attempted overwrite fails
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
firstException);
+
+        // second attempt also fails
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
secondException);
+
+        // Delete file and verify an overwrite works again
+        fs.delete(testFile, false);
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+    }
+
+    @Test
+    public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip if multipart upload not supported
+        Assume.assumeTrue("Skipping as multipart upload not supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, 
true);
+
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, 
true, null, true));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
firstException);
+
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, 
true, null, true));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
secondException);
+    }
+
+    @Test
+    public void testIfNoneMatchMultipartUploadWithRaceCondition() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another small file in parallel
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchTwoConcurrentMultipartUploads() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another multipart file in parallel
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, 
true);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create a non-empty file
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // overwrite with zero-byte file (no write)
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null);
+
+        // close the stream, should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create an empty file (no write)
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null);
+        stream.close();
+
+        // overwrite with non-empty file, should throw 
RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create an empty file (no write)
+        FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, true, 
null);
+        stream1.close();
+
+        // overwrite with another empty file, should throw 
RemoteFileChangedException
+        FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, 
null);
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream2::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfMatchOverwriteWithCorrectEtag() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path path = methodPath();
+        fs.mkdirs(path.getParent());
+
+        // Create a file
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+        // Retrieve the etag from the created file
+        String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);

Review Comment:
   1. this code is duplicated across many tests. pull out into its own method 
(getEtag(path))
   
   2.  use AssertJ Assertions.assertThat().isNotNull();



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static 
org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;
+
+    private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 
255);
+    private static final byte[] MULTIPART_FILE_BYTES = 
dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
+
+    private BlockOutputStreamStatistics statistics;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+
+        S3ATestUtils.disableFilesystemCaching(conf);
+        removeBaseAndBucketOverrides(
+                conf,
+                MULTIPART_SIZE,
+                UPLOAD_PART_COUNT_LIMIT,
+                MIN_MULTIPART_THRESHOLD);
+        conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+        conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD);
+        conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD);
+        return conf;
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        Configuration conf = getConfiguration();
+        skipIfNotEnabled(conf, FS_S3A_CREATE_OVERWRITE_SUPPORTED,
+                "Skipping IfNoneMatch tests");
+    }
+
+    private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+        S3Exception s3Exception = (S3Exception) ex.getCause();
+
+        if (s3Exception.statusCode() != code) {
+            throw new AssertionError("Expected status code " + code + " from " 
+ ex, ex);
+        }
+    }
+
+    protected String getBlockOutputBufferName() {
+        return FAST_UPLOAD_BUFFER_ARRAY;
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStream stream = getStreamWithFlags(fs, path, 
ifNoneMatchFlag, etag, forceMultipart);
+        if (data != null && data.length > 0) {
+            stream.write(data);
+        }
+        stream.close();
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false);
+    }
+
+     private static FSDataOutputStream getStreamWithFlags(
+            FileSystem fs,
+            Path path,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStreamBuilder builder = fs.createFile(path);
+        if (ifNoneMatchFlag) {
+            builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, "true");
+        }
+        if (etag != null) {
+            builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag);
+        }
+        if (forceMultipart) {
+            builder.opt(FS_S3A_CREATE_MULTIPART, "true");
+        }
+        return builder.create().build();
+    }
+
+    private static FSDataOutputStream getStreamWithFlags(
+            FileSystem fs,
+            Path path,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        return getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, false);
+    }
+
+    private static String readFileContent(FileSystem fs, Path path) throws 
Throwable {
+        try (FSDataInputStream inputStream = fs.open(path)) {
+            return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+        }
+    }
+
+    private void updateStatistics(FSDataOutputStream stream) {
+        statistics = S3ATestUtils.getOutputStreamStatistics(stream);
+    }
+
+    @Test
+    public void testIfNoneMatchConflictOnOverwrite() throws Throwable {
+        describe("generate conflict on overwrites");
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create a file over an empty path: all good
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // attempted overwrite fails
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
firstException);
+
+        // second attempt also fails
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
secondException);
+
+        // Delete file and verify an overwrite works again
+        fs.delete(testFile, false);
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+    }
+
+    @Test
+    public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip if multipart upload not supported
+        Assume.assumeTrue("Skipping as multipart upload not supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, 
true);
+
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, 
true, null, true));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
firstException);
+
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, 
true, null, true));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
secondException);
+    }
+
+    @Test
+    public void testIfNoneMatchMultipartUploadWithRaceCondition() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another small file in parallel
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchTwoConcurrentMultipartUploads() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another multipart file in parallel
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, 
true);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create a non-empty file
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // overwrite with zero-byte file (no write)
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null);
+
+        // close the stream, should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create an empty file (no write)
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null);
+        stream.close();
+
+        // overwrite with non-empty file, should throw 
RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create an empty file (no write)
+        FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, true, 
null);
+        stream1.close();
+
+        // overwrite with another empty file, should throw 
RemoteFileChangedException
+        FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, 
null);
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream2::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfMatchOverwriteWithCorrectEtag() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path path = methodPath();
+        fs.mkdirs(path.getParent());
+
+        // Create a file
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+        // Retrieve the etag from the created file
+        String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        String updatedFileContent = "Updated content";
+        byte[] updatedData = 
updatedFileContent.getBytes(StandardCharsets.UTF_8);
+
+        // overwrite file with etag
+        createFileWithFlags(fs, path, updatedData, false, etag);
+
+        // read file and verify overwritten content
+        String fileContent = readFileContent(fs, path);
+        assertEquals(
+                "File content should be correctly updated after overwriting 
with the correct ETag",
+                updatedFileContent,
+                fileContent
+        );
+    }
+
+    @Test
+    public void testIfMatchOverwriteWithOutdatedEtag() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path path = methodPath();
+        fs.mkdirs(path.getParent());
+
+        // Create a file
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, true, null);
+
+        // Retrieve the etag from the created file
+        String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        // Overwrite the file. Will update the etag, making the previously 
fetched etag outdated.
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+        // overwrite file with outdated etag. Should throw 
RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, 
etag));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfMatchOverwriteDeletedFileWithEtag() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path path = methodPath();
+        fs.mkdirs(path.getParent());
+
+        // Create a file
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+        // Retrieve the etag from the created file
+        String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        // delete the file
+        fs.delete(path);
+
+        // overwrite file with etag. Should throw FileNotFoundException
+        FileNotFoundException exception = 
intercept(FileNotFoundException.class,
+                () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, 
etag));
+        assertS3ExceptionStatusCode(SC_404_NOT_FOUND, exception);
+    }
+
+    @Test
+    public void testIfMatchOverwriteFileWithEmptyEtag() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path path = methodPath();
+        fs.mkdirs(path.getParent());
+
+        // Create a file
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+        // overwrite file with empty etag. Should throw 
IllegalArgumentException
+        intercept(IllegalArgumentException.class,
+                () -> createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, 
""));
+    }
+
+    @Test
+    public void testIfMatchMultipartUploadWithRaceCondition() throws Throwable 
{
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another small file in parallel
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfMatchTwoMultipartUploadsRaceConditionOneClosesFirst() 
throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file and retrieve its etag
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);
+        String etag = ((S3AFileStatus) fs.getFileStatus(testFile)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        // Start two multipart uploads with the same etag
+        FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, false, 
etag, true);
+        FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, false, 
etag, true);
+
+        // Write data to both streams
+        stream1.write(MULTIPART_FILE_BYTES);
+        stream2.write(MULTIPART_FILE_BYTES);
+
+        // Close the first stream successfully. Will update the etag
+        stream1.close();
+
+        // Close second stream, should fail due to etag mismatch
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream2::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfMatchCreateFileWithoutOverwrite() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // Create a file and retrieve the etag
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);
+        String etag = ((S3AFileStatus) fs.getFileStatus(testFile)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        // Attempt to create a file at the same path without overwrite, using 
If-Match with the etag
+        FileAlreadyExistsException exception = 
intercept(FileAlreadyExistsException.class, () -> {
+            fs.createFile(testFile)
+                    .overwrite(false)
+                    .opt(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag)
+                    .build();
+        });
+    }
+
+    @Test
+    public void testIfMatchCreateFileWithoutOverwriteWithPerformanceFlag() 
throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        getConfiguration().set(FS_S3A_PERFORMANCE_FLAGS, "create");
+
+        // Create a file and retrieve the etag
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, false, null);
+        String etag = ((S3AFileStatus) fs.getFileStatus(testFile)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        // Attempt to create a file at the same path without overwrite, using 
If-Match with the etag
+        FileAlreadyExistsException exception = 
intercept(FileAlreadyExistsException.class, () -> {
+            fs.createFile(testFile)
+                    .overwrite(false)
+                    .opt(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag)
+                    .build();
+        });
+    }
+
+    @Test
+    public void testConditionalWriteStatisticsWithoutIfNoneMatch() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // write without an If-None-Match
+        // conditional_write, conditional_write_statistics should remain 0
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, false, 
null, false);
+        updateStatistics(stream);
+        stream.write(SMALL_FILE_BYTES);
+        stream.close();
+        long conditionalCreate = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE.getSymbol());
+        long conditionalCreateFailed = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE_FAILED.getSymbol());
+        assertEquals("Write without If-None-Match: conditional_create should 
be 0", 0L, conditionalCreate);
+        assertEquals("Write without If-None-Match: conditional_create_failed 
should be 0", 0L, conditionalCreateFailed);
+
+        // write with overwrite = true
+        // conditional_write, conditional_write_statistics should remain 0
+        try (FSDataOutputStream outputStream = fs.create(testFile, true)) {
+            outputStream.write(SMALL_FILE_BYTES);
+            updateStatistics(outputStream);
+        }
+        conditionalCreate = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE.getSymbol());
+        conditionalCreateFailed = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE_FAILED.getSymbol());
+        assertEquals("Write with overwrite=true: conditional_create should be 
0", 0L, conditionalCreate);
+        assertEquals("Write with overwrite=true: conditional_create_failed 
should be 0", 0L, conditionalCreateFailed);
+
+        // write in path where file already exists with overwrite = false
+        // conditional_write, conditional_write_statistics should remain 0
+        try (FSDataOutputStream outputStream = fs.create(testFile, false)) {
+            outputStream.write(SMALL_FILE_BYTES);
+            updateStatistics(outputStream);
+        } catch (FileAlreadyExistsException e) {}
+        conditionalCreate = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE.getSymbol());
+        conditionalCreateFailed = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE_FAILED.getSymbol());
+        assertEquals("Write in path where file already exists with 
overwrite=false: conditional_create should be 0", 0L, conditionalCreate);
+        assertEquals("Write in path where file already exists with 
overwrite=false: conditional_create_failed should be 0", 0L, 
conditionalCreateFailed);
+
+        // delete the file
+        fs.delete(testFile, false);
+
+        // write in path where file doesn't exist with overwrite = false
+        // conditional_write, conditional_write_statistics should remain 0
+        try (FSDataOutputStream outputStream = fs.create(testFile, false)) {
+            outputStream.write(SMALL_FILE_BYTES);
+            updateStatistics(outputStream);
+        }
+        conditionalCreate = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE.getSymbol());
+        conditionalCreateFailed = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE_FAILED.getSymbol());
+        assertEquals("Write in path where file does not exist with 
overwrite=false: conditional_create should be 0", 0L, conditionalCreate);
+        assertEquals("Write in path where file does not exist with 
overwrite=false: conditional_create_failed should be 0", 0L, 
conditionalCreateFailed);
+    }
+
+    @Test
+    public void testConditionalWriteStatisticsWithIfNoneMatch() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, false);
+        updateStatistics(stream);
+        stream.write(SMALL_FILE_BYTES);
+        stream.close();
+
+        long conditionalCreate = 
statistics.lookupCounterValue(Statistic.CONDITIONAL_CREATE.getSymbol());

Review Comment:
   There's a class IOStatisticAssertions to help assert here...look at its 
usagesj. It's assertions result in AssertJ assertions, so you can go
   
   ```java
   verifyStatisticCounterValue(statistics, CONDITIONAL_CREATE.getSymbol, 1)
   ```
   
   same elsewhere
   
   



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java:
##########
@@ -137,6 +137,12 @@ public enum Statistic {
       TYPE_COUNTER),
   IGNORED_ERRORS("ignored_errors", "Errors caught and ignored",
       TYPE_COUNTER),
+  CONDITIONAL_CREATE(StoreStatisticNames.CONDITIONAL_CREATE,
+      "Count of successful conditional create operations.",
+      TYPE_COUNTER),

Review Comment:
   put these in alphabetical order with all the other statistics



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assume;

Review Comment:
   use assertJ assumeThat



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestS3APutIfMatch.java:
##########
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+
+import org.junit.Assume;
+import org.junit.Test;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.RemoteFileChangedException;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE;
+import static 
org.apache.hadoop.fs.Options.CreateFileOptionKeys.FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BUFFER_ARRAY;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_MULTIPART;
+import static 
org.apache.hadoop.fs.s3a.Constants.FS_S3A_CREATE_OVERWRITE_SUPPORTED;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_PERFORMANCE_FLAGS;
+import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_SIZE;
+import static 
org.apache.hadoop.fs.s3a.Constants.STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404_NOT_FOUND;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_412_PRECONDITION_FAILED;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
+import static org.apache.hadoop.fs.s3a.scale.S3AScaleTestBase._1KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public class ITestS3APutIfMatch extends AbstractS3ACostTest {
+
+    private static final int UPDATED_MULTIPART_THRESHOLD = 100 * _1KB;
+
+    private static final byte[] SMALL_FILE_BYTES = dataset(TEST_FILE_LEN, 0, 
255);
+    private static final byte[] MULTIPART_FILE_BYTES = 
dataset(UPDATED_MULTIPART_THRESHOLD * 5, 'a', 'z' - 'a');
+
+    private BlockOutputStreamStatistics statistics;
+
+    @Override
+    public Configuration createConfiguration() {
+        Configuration conf = super.createConfiguration();
+
+        S3ATestUtils.disableFilesystemCaching(conf);
+        removeBaseAndBucketOverrides(
+                conf,
+                MULTIPART_SIZE,
+                UPLOAD_PART_COUNT_LIMIT,
+                MIN_MULTIPART_THRESHOLD);
+        conf.setLong(UPLOAD_PART_COUNT_LIMIT, 2);
+        conf.setLong(MIN_MULTIPART_THRESHOLD, UPDATED_MULTIPART_THRESHOLD);
+        conf.setInt(MULTIPART_SIZE, UPDATED_MULTIPART_THRESHOLD);
+        return conf;
+    }
+
+    @Override
+    public void setup() throws Exception {
+        super.setup();
+        Configuration conf = getConfiguration();
+        skipIfNotEnabled(conf, FS_S3A_CREATE_OVERWRITE_SUPPORTED,
+                "Skipping IfNoneMatch tests");
+    }
+
+    private static void assertS3ExceptionStatusCode(int code, Exception ex) {
+        S3Exception s3Exception = (S3Exception) ex.getCause();
+
+        if (s3Exception.statusCode() != code) {
+            throw new AssertionError("Expected status code " + code + " from " 
+ ex, ex);
+        }
+    }
+
+    protected String getBlockOutputBufferName() {
+        return FAST_UPLOAD_BUFFER_ARRAY;
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStream stream = getStreamWithFlags(fs, path, 
ifNoneMatchFlag, etag, forceMultipart);
+        if (data != null && data.length > 0) {
+            stream.write(data);
+        }
+        stream.close();
+    }
+
+    private static void createFileWithFlags(
+            FileSystem fs,
+            Path path,
+            byte[] data,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        createFileWithFlags(fs, path, data, ifNoneMatchFlag, etag, false);
+    }
+
+     private static FSDataOutputStream getStreamWithFlags(
+            FileSystem fs,
+            Path path,
+            boolean ifNoneMatchFlag,
+            String etag,
+            boolean forceMultipart) throws Exception {
+        FSDataOutputStreamBuilder builder = fs.createFile(path);
+        if (ifNoneMatchFlag) {
+            builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE, "true");
+        }
+        if (etag != null) {
+            builder.must(FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG, etag);
+        }
+        if (forceMultipart) {
+            builder.opt(FS_S3A_CREATE_MULTIPART, "true");
+        }
+        return builder.create().build();
+    }
+
+    private static FSDataOutputStream getStreamWithFlags(
+            FileSystem fs,
+            Path path,
+            boolean ifNoneMatchFlag,
+            String etag) throws Exception {
+        return getStreamWithFlags(fs, path, ifNoneMatchFlag, etag, false);
+    }
+
+    private static String readFileContent(FileSystem fs, Path path) throws 
Throwable {
+        try (FSDataInputStream inputStream = fs.open(path)) {
+            return IOUtils.toString(inputStream, StandardCharsets.UTF_8);
+        }
+    }
+
+    private void updateStatistics(FSDataOutputStream stream) {
+        statistics = S3ATestUtils.getOutputStreamStatistics(stream);
+    }
+
+    @Test
+    public void testIfNoneMatchConflictOnOverwrite() throws Throwable {
+        describe("generate conflict on overwrites");
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create a file over an empty path: all good
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // attempted overwrite fails
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
firstException);
+
+        // second attempt also fails
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
secondException);
+
+        // Delete file and verify an overwrite works again
+        fs.delete(testFile, false);
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+    }
+
+    @Test
+    public void testIfNoneMatchConflictOnMultipartUpload() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip if multipart upload not supported
+        Assume.assumeTrue("Skipping as multipart upload not supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, 
true);
+
+        RemoteFileChangedException firstException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, 
true, null, true));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
firstException);
+
+        RemoteFileChangedException secondException = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, 
true, null, true));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, 
secondException);
+    }
+
+    @Test
+    public void testIfNoneMatchMultipartUploadWithRaceCondition() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another small file in parallel
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchTwoConcurrentMultipartUploads() throws 
Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+
+        // Skip test if multipart uploads are not supported
+        Assume.assumeTrue("Skipping test as multipart uploads are not 
supported",
+                fs.hasPathCapability(testFile, 
STORE_CAPABILITY_MULTIPART_UPLOAD_ENABLED));
+
+        // Create a file with multipart upload but do not close the stream
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null, true);
+        stream.write(MULTIPART_FILE_BYTES);
+
+        // create and close another multipart file in parallel
+        createFileWithFlags(fs, testFile, MULTIPART_FILE_BYTES, true, null, 
true);
+
+        // Closing the first stream should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteWithEmptyFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create a non-empty file
+        createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, true, null);
+
+        // overwrite with zero-byte file (no write)
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null);
+
+        // close the stream, should throw RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteEmptyFileWithFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create an empty file (no write)
+        FSDataOutputStream stream = getStreamWithFlags(fs, testFile, true, 
null);
+        stream.close();
+
+        // overwrite with non-empty file, should throw 
RemoteFileChangedException
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class,
+                () -> createFileWithFlags(fs, testFile, SMALL_FILE_BYTES, 
true, null));
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfNoneMatchOverwriteEmptyWithEmptyFile() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path testFile = methodPath();
+        fs.mkdirs(testFile.getParent());
+
+        // create an empty file (no write)
+        FSDataOutputStream stream1 = getStreamWithFlags(fs, testFile, true, 
null);
+        stream1.close();
+
+        // overwrite with another empty file, should throw 
RemoteFileChangedException
+        FSDataOutputStream stream2 = getStreamWithFlags(fs, testFile, true, 
null);
+        RemoteFileChangedException exception = 
intercept(RemoteFileChangedException.class, stream2::close);
+        assertS3ExceptionStatusCode(SC_412_PRECONDITION_FAILED, exception);
+    }
+
+    @Test
+    public void testIfMatchOverwriteWithCorrectEtag() throws Throwable {
+        FileSystem fs = getFileSystem();
+        Path path = methodPath();
+        fs.mkdirs(path.getParent());
+
+        // Create a file
+        createFileWithFlags(fs, path, SMALL_FILE_BYTES, false, null);
+
+        // Retrieve the etag from the created file
+        String etag = ((S3AFileStatus) fs.getFileStatus(path)).getETag();
+        assertNotNull("ETag should not be null after file creation", etag);
+
+        String updatedFileContent = "Updated content";
+        byte[] updatedData = 
updatedFileContent.getBytes(StandardCharsets.UTF_8);
+
+        // overwrite file with etag
+        createFileWithFlags(fs, path, updatedData, false, etag);
+
+        // read file and verify overwritten content
+        String fileContent = readFileContent(fs, path);
+        assertEquals(

Review Comment:
   use assertJ





> S3A: Support S3 Conditional Writes
> ----------------------------------
>
>                 Key: HADOOP-19256
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19256
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>            Reporter: Ahmar Suhail
>            Priority: Major
>              Labels: pull-request-available
>
> S3 Conditional Write (Put-if-absent) capability is now generally available - 
> [https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/]
>  
> S3A should allow passing in this put-if-absent header to prevent over writing 
> of files. 
> There is a feature branch for this: HADOOP-19256-s3-conditional-writes
> + support etags to allow an overwrite to be restricted to overwriting a 
> specific version. This can be done through a createFile option.
> https://docs.aws.amazon.com/AmazonS3/latest/userguide/conditional-writes.html
> Fun fact; third party stores will not reject overwrites if they don't 
> recognise the headers, so there's no way to be sure they are supported 
> without testing.
> we need a flag to enable/disable conditional writes which can be exposed in a 
> hasPathCapability()



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to