This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3e9d69eec2a86b45914ded0e76ae1ed53b427c0
Author: spurthi chaganti <c.spur...@criteo.com>
AuthorDate: Mon Apr 20 00:12:26 2020 -0400

    [FLINK-17253][fs-connector] Support viewfs for hadoop version < 2.7
    
    This closes #11815.
---
 .../apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java  |  2 +-
 ...oopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java | 10 ++++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index 91d76c6..ed233c5 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -57,7 +57,7 @@ public class HadoopRecoverableWriter implements 
RecoverableWriter {
                this.fs = checkNotNull(fs);
 
                // This writer is only supported on a subset of file systems
-               if (!"hdfs".equalsIgnoreCase(fs.getScheme())) {
+               if (!("hdfs".equalsIgnoreCase(fs.getScheme()) || 
"viewfs".equalsIgnoreCase(fs.getScheme()))) {
                        throw new UnsupportedOperationException(
                                        "Recoverable writers on Hadoop are only 
supported for HDFS");
                }
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
index b12090f..03d978a 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
@@ -34,6 +34,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -44,6 +45,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static junit.framework.TestCase.assertEquals;
 import static junit.framework.TestCase.assertNotNull;
 import static junit.framework.TestCase.assertTrue;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the {@link HadoopRecoverableWriter} with Hadoop versions pre 
Hadoop 2.7.
@@ -141,6 +143,14 @@ public class 
HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest {
                }
        }
 
+       @Test
+       public void testRecoverableWriterWithViewfsScheme() {
+               final org.apache.hadoop.fs.FileSystem mockViewfs = 
Mockito.mock(org.apache.hadoop.fs.FileSystem.class);
+               when(mockViewfs.getScheme()).thenReturn("viewfs");
+               // Creating the writer should not throw 
UnsupportedOperationException.
+               RecoverableWriter recoverableWriter = new 
HadoopRecoverableWriter(mockViewfs);
+       }
+
        private RecoverableFsDataOutputStream getOpenStreamToFileWithContent(
                        final RecoverableWriter writerUnderTest,
                        final Path path,

Reply via email to