This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new e373c44 [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest e373c44 is described below commit e373c4481e6a0ca0e1e73a6170b9e3da5cc9be5b Author: Yu Li <l...@apache.org> AuthorDate: Fri Jul 26 10:27:28 2019 +0200 [FLINK-13228][tests][filesystems] Harden HadoopRecoverableWriterTest Currently test cases will fail when trying to close the output stream if all data written but ClosedByInterruptException occurs at the ending phase. This commit fixes it. This closes #9235 --- .../core/fs/AbstractRecoverableWriterTest.java | 49 ++++++++++++++++++---- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java index ab37a07..de9b095 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractRecoverableWriterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.core.fs; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.IOUtils; import org.apache.flink.util.StringUtils; import org.apache.flink.util.TestLogger; @@ -111,7 +112,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final Path path = new Path(testDir, "part-0"); - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().commit(); @@ -119,6 +122,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { Assert.assertEquals("part-0", fileContents.getKey().getName()); Assert.assertEquals(testData1, fileContents.getValue()); } + } finally { + IOUtils.closeQuietly(stream); } } @@ -130,7 +135,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final Path path = new Path(testDir, "part-0"); - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.persist(); @@ -141,6 +148,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { Assert.assertEquals("part-0", fileContents.getKey().getName()); Assert.assertEquals(testData1 + testData2, fileContents.getValue()); } + } finally { + IOUtils.closeQuietly(stream); } } @@ -194,7 +203,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter initWriter = getNewFileSystemWriter(); final Map<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<>(4); - try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = initWriter.open(path); recoverables.put(INIT_EMPTY_PERSIST, stream.persist()); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); @@ -206,6 +217,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { stream.write(testData2.getBytes(StandardCharsets.UTF_8)); recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist()); + } finally { + IOUtils.closeQuietly(stream); } final SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> serializer = initWriter.getResumeRecoverableSerializer(); @@ -217,7 +230,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter.ResumeRecoverable recoveredRecoverable = deserializer.deserialize(serializer.getVersion(), serializedRecoverable); - try (final RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable)) { + RecoverableFsDataOutputStream recoveredStream = null; + try { + recoveredStream = newWriter.recover(recoveredRecoverable); // we expect the data to be truncated Map<Path, String> files = getFileContentByPath(testDir); @@ -238,6 +253,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { Assert.assertEquals("part-0", fileContents.getKey().getName()); Assert.assertEquals(expectedFinalContents, fileContents.getValue()); } + } finally { + IOUtils.closeQuietly(recoveredStream); } } @@ -249,7 +266,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter initWriter = getNewFileSystemWriter(); final RecoverableWriter.CommitRecoverable recoverable; - try (final RecoverableFsDataOutputStream stream = initWriter.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = initWriter.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.persist(); @@ -259,6 +278,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { stream.write(testData2.getBytes(StandardCharsets.UTF_8)); recoverable = stream.closeForCommit().getRecoverable(); + } finally { + IOUtils.closeQuietly(stream); } final byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize(recoverable); @@ -289,12 +310,16 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter writer = getNewFileSystemWriter(); final Path path = new Path(testDir, "part-0"); - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().getRecoverable(); stream.write(testData2.getBytes(StandardCharsets.UTF_8)); fail(); + } finally { + IOUtils.closeQuietly(stream); } } @@ -306,13 +331,17 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final Path path = new Path(testDir, "part-0"); RecoverableWriter.ResumeRecoverable recoverable; - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); recoverable = stream.persist(); stream.write(testData2.getBytes(StandardCharsets.UTF_8)); stream.closeForCommit().commit(); + } finally { + IOUtils.closeQuietly(stream); } // this should throw an exception as the file is already committed @@ -332,7 +361,9 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { final RecoverableWriter.ResumeRecoverable recoverable1; final RecoverableWriter.ResumeRecoverable recoverable2; - try (final RecoverableFsDataOutputStream stream = writer.open(path)) { + RecoverableFsDataOutputStream stream = null; + try { + stream = writer.open(path); stream.write(testData1.getBytes(StandardCharsets.UTF_8)); recoverable1 = stream.persist(); @@ -340,6 +371,8 @@ public abstract class AbstractRecoverableWriterTest extends TestLogger { recoverable2 = stream.persist(); stream.write(testData3.getBytes(StandardCharsets.UTF_8)); + } finally { + IOUtils.closeQuietly(stream); } try (RecoverableFsDataOutputStream ignored = writer.recover(recoverable1)) {