This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch 0.14.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 60d5ff7b3ee60a92cd80047aace513b18974500a
Author: Abid Mohammed <[email protected]>
AuthorDate: Tue Aug 2 10:33:52 2022 -0700

    AWS: S3OutputStream - failure to close should persist on subsequent close 
calls (#5311)
---
 .../org/apache/iceberg/aws/s3/S3OutputStream.java     | 14 +++++++++++++-
 .../org/apache/iceberg/aws/s3/TestS3OutputStream.java | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 555a9e1634..80ed982367 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -103,6 +103,7 @@ class S3OutputStream extends PositionOutputStream {
 
   private long pos = 0;
   private boolean closed = false;
+  private Throwable closeFailureException;
 
   @SuppressWarnings("StaticAssignmentInConstructor")
   S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, 
MetricsContext metrics)
@@ -240,6 +241,15 @@ class S3OutputStream extends PositionOutputStream {
 
   @Override
   public void close() throws IOException {
+
+    // A failed s3 close removes state that is required for a successful close.
+    // Any future close on this stream should fail.
+    if (closeFailureException != null) {
+      throw new IOException(
+          "Attempted to close an S3 output stream that failed to close 
earlier",
+          closeFailureException);
+    }
+
     if (closed) {
       return;
     }
@@ -249,8 +259,10 @@ class S3OutputStream extends PositionOutputStream {
 
     try {
       stream.close();
-
       completeUploads();
+    } catch (Exception e) {
+      closeFailureException = e;
+      throw e;
     } finally {
       cleanUpStagingFiles();
     }
diff --git 
a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java 
b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
index c9b6043ccd..28d9c5da6c 100644
--- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
+++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java
@@ -42,6 +42,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -160,6 +161,24 @@ public class TestS3OutputStream {
     writeTest();
   }
 
+  @Test
+  public void testCloseFailureShouldPersistOnFutureClose() throws IOException {
+    IllegalStateException mockException =
+        new IllegalStateException("mock failure to completeUploads on close");
+    Mockito.doThrow(mockException)
+        .when(s3mock)
+        .putObject(any(PutObjectRequest.class), any(RequestBody.class));
+    S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), 
properties, nullMetrics());
+
+    Assertions.assertThatThrownBy(stream::close)
+        .isInstanceOf(mockException.getClass())
+        .hasMessageContaining(mockException.getMessage());
+
+    Assertions.assertThatThrownBy(stream::close)
+        .isInstanceOf(IOException.class)
+        .hasCause(mockException);
+  }
+
   private void writeTest() {
     // Run tests for both byte and array write paths
     Stream.of(true, false).forEach(arrayWrite -> {

Reply via email to