This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 0e9f463 [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest 0e9f463 is described below commit 0e9f463668378bd7469194ebf0af76e3c125f0d7 Author: Yu Li <l...@apache.org> AuthorDate: Fri Jul 26 10:27:28 2019 +0200 [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest Currently test cases will fail when trying to close the output stream if all data written but ClosedByInterruptException occurs at the ending phase. This commit fixes it. This closes #9235 --- .../core/fs/AbstractRecoverableWriterTest.java | 49 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java index ab37a07..de9b095 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.core.fs; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.StringUtils; import org.apache.flink.util.TestLogger; @@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final Path path = new Path(testDir, "part-0"); - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().commit(); @@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { Assert.assertEquals("part-0", fileContents.getKey().getName()); Assert.assertEquals(testData1, fileContents.getValue()); } + } finally { + IOUtils.closeQuietly(stream); } } @@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final Path path = new Path(testDir, "part-0"); - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.persist(); @@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { Assert.assertEquals("part-0", fileContents.getKey().getName()); Assert.assertEquals(testData1 + testData2, fileContents.getValue()); } + } finally { + IOUtils.closeQuietly(stream); } } @@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter initWriter = getNewFileSystemWriter(); final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4); - try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = initWriter.open(path); recoverables.put(INIT_EMPTY_PERSIST, stream.persist()); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); @@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { stream.write(testData2.getBytes(StandardCharsets.UTF_8)); recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist()); + } finally { + IOUtils.closeQuietly(stream); } final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer(); @@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter.ResumeRecoverable recoveredRecoverable = deserializer.deserialize(serializer.getVersion(), serializedRecoverable); - try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) { + RecoverableFsDataOutputStream recoveredStream = null; + try { + recoveredStream = newWriter.recover(recoveredRecoverable); // we expect the data to be truncated Map<Path, String> files = getFileContentByPath(testDir); @@ -238,6 +253,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { Assert.assertEquals("part-0", fileContents.getKey().getName()); Assert.assertEquals(expectedFinalContents, fileContents.getValue()); } + } finally { + IOUtils.closeQuietly(recoveredStream); } } @@ -249,7 +266,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter initWriter = getNewFileSystemWriter(); final RecoverableWriter.CommitRecoverable recoverable; - try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = initWriter.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.persist(); @@ -259,6 +278,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { stream.write(testData2.getBytes(StandardCharsets.UTF_8)); recoverable = stream.closeForCommit().getRecoverable(); + } finally { + IOUtils.closeQuietly(stream); } final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable); @@ -289,12 +310,16 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter writer = getNewFileSystemWriter(); final Path path = new Path(testDir, "part-0"); - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().getRecoverable(); stream.write(testData2.getBytes(StandardCharsets.UTF_8)); fail(); + } finally { + IOUtils.closeQuietly(stream); } } @@ -306,13 +331,17 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final Path path = new Path(testDir, "part-0"); RecoverableWriter.ResumeRecoverable recoverable; - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); recoverable = stream.persist(); stream.write(testData2.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().commit(); + } finally { + IOUtils.closeQuietly(stream); } // this should throw an exception as the file is already committed @@ -332,7 +361,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter.ResumeRecoverable recoverable1; final RecoverableWriter.ResumeRecoverable recoverable2; - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); recoverable1 = stream.persist(); @@ -340,6 +371,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { recoverable2 = stream.persist(); stream.write(testData3.getBytes(StandardCharsets.UTF_8)); + } finally { + IOUtils.closeQuietly(stream); } try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {