This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e373c44  [FLINK-13228][tests][filesystems] Harden 
HadoopRecoverableWriterTest
e373c44 is described below

commit e373c4481e6a0ca0e1e73a6170b9e3da5cc9be5b
Author: Yu Li <l...@apache.org>
AuthorDate: Fri Jul 26 10:27:28 2019 +0200

    [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest
    
    Currently test cases will fail when trying to close the output stream if 
all data written
    but ClosedByInterruptException occurs at the ending phase. This commit 
fixes it.
    
    This closes #9235
---
 .../core/fs/AbstractRecoverableWriterTest.java     | 49 ++++++++++++++++++----
 1 file changed, 41 insertions(+), 8 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
index ab37a07..de9b095 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                final Path path = new Path(testDir, "part-0");
 
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
                        stream.closeForCommit().commit();
 
@@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                                Assert.assertEquals("part-0", 
fileContents.getKey().getName());
                                Assert.assertEquals(testData1, 
fileContents.getValue());
                        }
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
        }
 
@@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                final Path path = new Path(testDir, "part-0");
 
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
                        stream.persist();
 
@@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                                Assert.assertEquals("part-0", 
fileContents.getKey().getName());
                                Assert.assertEquals(testData1 + testData2, 
fileContents.getValue());
                        }
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
        }
 
@@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                final RecoverableWriter initWriter = getNewFileSystemWriter();
 
                final Map<String, RecoverableWriter.ResumeRecoverable> 
recoverables = new HashMap<>(4);
-               try (final RecoverableFsDataOutputStream stream = 
initWriter.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = initWriter.open(path);
                        recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
 
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
@@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
                        recoverables.put(FINAL_WITH_EXTRA_STATE, 
stream.persist());
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                final 
SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = 
initWriter.getResumeRecoverableSerializer();
@@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                final RecoverableWriter.ResumeRecoverable recoveredRecoverable =
                                
deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
 
-               try (final RecoverableFsDataOutputStream recoveredStream = 
newWriter.recover(recoveredRecoverable)) {
+               RecoverableFsDataOutputStream recoveredStream = null;
+               try {
+                       recoveredStream = 
newWriter.recover(recoveredRecoverable);
 
                        // we expect the data to be truncated
                        Map<Path, String> files = getFileContentByPath(testDir);
@@ -238,6 +253,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                                Assert.assertEquals("part-0", 
fileContents.getKey().getName());
                                Assert.assertEquals(expectedFinalContents, 
fileContents.getValue());
                        }
+               } finally {
+                       IOUtils.closeQuietly(recoveredStream);
                }
        }
 
@@ -249,7 +266,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                final RecoverableWriter initWriter = getNewFileSystemWriter();
 
                final RecoverableWriter.CommitRecoverable recoverable;
-               try (final RecoverableFsDataOutputStream stream = 
initWriter.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = initWriter.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        stream.persist();
@@ -259,6 +278,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
                        recoverable = stream.closeForCommit().getRecoverable();
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                final byte[] serializedRecoverable = 
initWriter.getCommitRecoverableSerializer().serialize(recoverable);
@@ -289,12 +310,16 @@ public abstract class AbstractRecoverableWriterTest 
extends TestLogger {
                final RecoverableWriter writer = getNewFileSystemWriter();
                final Path path = new Path(testDir, "part-0");
 
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        stream.closeForCommit().getRecoverable();
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
                        fail();
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
        }
 
@@ -306,13 +331,17 @@ public abstract class AbstractRecoverableWriterTest 
extends TestLogger {
                final Path path = new Path(testDir, "part-0");
 
                RecoverableWriter.ResumeRecoverable recoverable;
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        recoverable = stream.persist();
                        
stream.write(testData2.getBytes(StandardCharsets.UTF_8));
 
                        stream.closeForCommit().commit();
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                // this should throw an exception as the file is already 
committed
@@ -332,7 +361,9 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                final RecoverableWriter.ResumeRecoverable recoverable1;
                final RecoverableWriter.ResumeRecoverable recoverable2;
-               try (final RecoverableFsDataOutputStream stream = 
writer.open(path)) {
+               RecoverableFsDataOutputStream stream = null;
+               try {
+                       stream = writer.open(path);
                        
stream.write(testData1.getBytes(StandardCharsets.UTF_8));
 
                        recoverable1 = stream.persist();
@@ -340,6 +371,8 @@ public abstract class AbstractRecoverableWriterTest extends 
TestLogger {
 
                        recoverable2 = stream.persist();
                        
stream.write(testData3.getBytes(StandardCharsets.UTF_8));
+               } finally {
+                       IOUtils.closeQuietly(stream);
                }
 
                try (RecoverableFsDataOutputStream ignored = 
writer.recover(recoverable1)) {

Reply via email to