This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e3e9d69eec2a86b45914ded0e76ae1ed53b427c0 Author: spurthi chaganti <c.spur...@criteo.com> AuthorDate: Mon Apr 20 00:12:26 2020 -0400 [FLINK-17253][fs-connector] Support viewfs for hadoop version < 2.7 This closes #11815. --- .../apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java | 2 +- ...oopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java index 91d76c6..ed233c5 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java @@ -57,7 +57,7 @@ public class HadoopRecoverableWriter implements RecoverableWriter { this.fs = checkNotNull(fs); // This writer is only supported on a subset of file systems - if (!"hdfs".equalsIgnoreCase(fs.getScheme())) { + if (!("hdfs".equalsIgnoreCase(fs.getScheme()) || "viewfs".equalsIgnoreCase(fs.getScheme()))) { throw new UnsupportedOperationException( "Recoverable writers on Hadoop are only supported for HDFS"); } diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java index b12090f..03d978a 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java @@ -34,6 +34,7 @@ import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; import java.io.BufferedReader; import java.io.File; @@ -44,6 +45,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static junit.framework.TestCase.assertEquals; import static junit.framework.TestCase.assertNotNull; import static junit.framework.TestCase.assertTrue; +import static org.mockito.Mockito.when; /** * Tests for the {@link HadoopRecoverableWriter} with Hadoop versions pre Hadoop 2.7. @@ -141,6 +143,14 @@ public class HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest { } } + @Test + public void testRecoverableWriterWithViewfsScheme() { + final org.apache.hadoop.fs.FileSystem mockViewfs = Mockito.mock(org.apache.hadoop.fs.FileSystem.class); + when(mockViewfs.getScheme()).thenReturn("viewfs"); + // Creating the writer should not throw UnsupportedOperationException. + RecoverableWriter recoverableWriter = new HadoopRecoverableWriter(mockViewfs); + } + private RecoverableFsDataOutputStream getOpenStreamToFileWithContent( final RecoverableWriter writerUnderTest, final Path path,