This is an automated email from the ASF dual-hosted git repository. blue pushed a commit to branch 0.14.x in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 60d5ff7b3ee60a92cd80047aace513b18974500a Author: Abid Mohammed <[email protected]> AuthorDate: Tue Aug 2 10:33:52 2022 -0700 AWS: S3OutputStream - failure to close should persist on subsequent close calls (#5311) --- .../org/apache/iceberg/aws/s3/S3OutputStream.java | 14 +++++++++++++- .../org/apache/iceberg/aws/s3/TestS3OutputStream.java | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java index 555a9e1634..80ed982367 100644 --- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java +++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java @@ -103,6 +103,7 @@ class S3OutputStream extends PositionOutputStream { private long pos = 0; private boolean closed = false; + private Throwable closeFailureException; @SuppressWarnings("StaticAssignmentInConstructor") S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties, MetricsContext metrics) @@ -240,6 +241,15 @@ class S3OutputStream extends PositionOutputStream { @Override public void close() throws IOException { + + // A failed s3 close removes state that is required for a successful close. + // Any future close on this stream should fail. + if (closeFailureException != null) { + throw new IOException( + "Attempted to close an S3 output stream that failed to close earlier", + closeFailureException); + } + if (closed) { return; } @@ -249,8 +259,10 @@ class S3OutputStream extends PositionOutputStream { try { stream.close(); - completeUploads(); + } catch (Exception e) { + closeFailureException = e; + throw e; } finally { cleanUpStagingFiles(); } diff --git a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java index c9b6043ccd..28d9c5da6c 100644 --- a/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java +++ b/aws/src/test/java/org/apache/iceberg/aws/s3/TestS3OutputStream.java @@ -42,6 +42,7 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -160,6 +161,24 @@ public class TestS3OutputStream { writeTest(); } + @Test + public void testCloseFailureShouldPersistOnFutureClose() throws IOException { + IllegalStateException mockException = + new IllegalStateException("mock failure to completeUploads on close"); + Mockito.doThrow(mockException) + .when(s3mock) + .putObject(any(PutObjectRequest.class), any(RequestBody.class)); + S3OutputStream stream = new S3OutputStream(s3mock, randomURI(), properties, nullMetrics()); + + Assertions.assertThatThrownBy(stream::close) + .isInstanceOf(mockException.getClass()) + .hasMessageContaining(mockException.getMessage()); + + Assertions.assertThatThrownBy(stream::close) + .isInstanceOf(IOException.class) + .hasCause(mockException); + } + private void writeTest() { // Run tests for both byte and array write paths Stream.of(true, false).forEach(arrayWrite -> {
