This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 0e9f463  [FLINK-13228][tests][filesystems] Harden 
HadoopRecoverableWriterTest
0e9f463 is described below

commit 0e9f463668378bd7469194ebf0af76e3c125f0d7
Author: Yu Li <l...@apache.org>
AuthorDate: Fri Jul 26 10:27:28 2019 +0200

    [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest
    
    Currently test cases will fail when trying to close the output stream if 
all data written
    but ClosedByInterruptException occurs at the ending phase. This commit 
fixes it.
    
    This closes #9235
---
 .../core/fs/AbstractRecoverableWriterTest.java     | 49 ++++++++++++++++++----
 1 file changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
index ab37a07..de9b095 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                final Path path = new Path(testDir, "part-0");
 
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
                        stream.closeForCommit().commit();
 
@@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                                Assert.assertEquals("part-0", 
fileContents.getKey().getName());
                                Assert.assertEquals(testData1, 
fileContents.getValue());
                        }
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
        }
 
@@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                final Path path = new Path(testDir, "part-0");
 
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
                        stream.persist();
 
@@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                                Assert.assertEquals("part-0", 
fileContents.getKey().getName());
                                Assert.assertEquals(testData1 + testData2, 
fileContents.getValue());
                        }
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
        }
 
@@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                final RecoverableWriter initWriter = getNewFileSystemWriter();
 
                final Map<String, RecoverableWriter.ResumeRecoverable> 
recoverables = new HashMap<>(4);
-               try (final RecoverableFsDataOutputStream stream = 
initWriter.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = initWriter.open(path);
                        recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
 
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
@@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
                        recoverables.put(FINAL_WITH_EXTRA_STATE, 
stream.persist());
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                final 
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = 
initWriter.getResumeRecoverableSerializer();
@@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
                                
deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
 
-               try (final RecoverableFsDataOutputStream recoveredStream = 
newWriter.recover(recoveredRecoverable)) {
+               RecoverableFsDataOutputStream recoveredStream = null;
+               try {
+                       recoveredStream = 
newWriter.recover(recoveredRecoverable);
 
                        // we expect the data to be truncated
                        Map<Path, String> files = getFileContentByPath(testDir);
@@ -238,6 +253,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                                Assert.assertEquals("part-0", 
fileContents.getKey().getName());
                                Assert.assertEquals(expectedFinalContents, 
fileContents.getValue());
                        }
+               } finally {
+                       IOUtils.closeQuietly(recoveredStream);
                }
        }
 
@@ -249,7 +266,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                final RecoverableWriter initWriter = getNewFileSystemWriter();
 
                final RecoverableWriter.CommitRecoverable recoverable;
-               try (final RecoverableFsDataOutputStream stream = 
initWriter.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = initWriter.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        stream.persist();
@@ -259,6 +278,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
                        recoverable = stream.closeForCommit().getRecoverable();
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                final byte[] serializedRecoverable = 
initWriter.getCommitRecoverableSerializer().serialize(recoverable);
@@ -289,12 +310,16 @@ public abstract class AbstractRecoverableWriterTest 
extends TestLogger {
                final RecoverableWriter writer = getNewFileSystemWriter();
                final Path path = new Path(testDir, "part-0");
 
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        stream.closeForCommit().getRecoverable();
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
                        fail();
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
        }
 
@@ -306,13 +331,17 @@ public abstract class AbstractRecoverableWriterTest 
extends TestLogger {
                final Path path = new Path(testDir, "part-0");
 
                RecoverableWriter.ResumeRecoverable recoverable;
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        recoverable = stream.persist();
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
                        stream.closeForCommit().commit();
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                // this should throw an exception as the file is already 
committed
@@ -332,7 +361,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                final RecoverableWriter.ResumeRecoverable recoverable1;
                final RecoverableWriter.ResumeRecoverable recoverable2;
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        recoverable1 = stream.persist();
@@ -340,6 +371,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                        recoverable2 = stream.persist();
                        
stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                try (RecoverableFsDataOutputStream ignored = 
writer.recover(recoverable1)) {

Reply via email to