This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 27ecf02fb2a8e61b68e93190080f540e53aca171
Author: wxp <wxp4...@outlook.com>
AuthorDate: Fri May 8 17:50:37 2020 +0800

    [FLINK-17569][fs-connector] Delegate lease revoking to correct from viewfs
    
    This closes #12035.
---
 .../hdfs/HadoopRecoverableFsDataOutputStream.java  |  21 ++-
 .../fs/hdfs/HadoopViewFileSystemTruncateTest.java  | 172 +++++++++++++++++++++
 2 files changed, 191 insertions(+), 2 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index adf123f..4ebd41d 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.util.VersionInfo;
 
@@ -157,7 +158,7 @@ class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream
 
                ensureTruncateInitialized();
 
-               waitUntilLeaseIsRevoked(fileSystem, path);
+               revokeLeaseByFileSystem(fileSystem, path);
 
                // truncate back and append
                boolean truncated;
@@ -170,7 +171,7 @@ class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream
                if (!truncated) {
                        // Truncate did not complete immediately, we must wait 
for
                        // the operation to complete and release the lease.
-                       waitUntilLeaseIsRevoked(fileSystem, path);
+                       revokeLeaseByFileSystem(fileSystem, path);
                }
        }
 
@@ -314,6 +315,22 @@ class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream
        }
 
        /**
+        * Resolve the real path of FileSystem if it is {@link ViewFileSystem} 
and
+        * revoke the lease of the file we are resuming with different 
FileSystem.
+        *
+        * @param path The path to the file we want to resume writing to.
+        */
+       private static boolean revokeLeaseByFileSystem(final FileSystem fs, 
final Path path) throws IOException {
+               if (fs instanceof ViewFileSystem) {
+                       final ViewFileSystem vfs = (ViewFileSystem) fs;
+                       final Path resolvePath = vfs.resolvePath(path);
+                       final FileSystem resolveFs = 
resolvePath.getFileSystem(fs.getConf());
+                       return waitUntilLeaseIsRevoked(resolveFs, resolvePath);
+               }
+               return waitUntilLeaseIsRevoked(fs, path);
+       }
+
+       /**
         * Called when resuming execution after a failure and waits until the 
lease
         * of the file we are resuming is free.
         *
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java
new file mode 100644
index 0000000..7daf489
--- /dev/null
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopViewFileSystemTruncateTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.fs.viewfs.ViewFileSystemTestSetup;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test for {@link org.apache.hadoop.fs.viewfs.ViewFileSystem} support.
+ */
+public class HadoopViewFileSystemTruncateTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       private final FileSystemTestHelper fileSystemTestHelper = new 
FileSystemTestHelper("/tests");
+
+       private static MiniDFSCluster hdfsCluster;
+       private static FileSystem fHdfs;
+       private static org.apache.flink.core.fs.FileSystem fSystem;
+
+       private Configuration fsViewConf;
+       private FileSystem fsTarget;
+       private Path targetTestRoot;
+
+       @BeforeClass
+       public static void testHadoopVersion() {
+               Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 7));
+       }
+
+       @BeforeClass
+       public static void verifyOS() {
+               Assume.assumeTrue("HDFS cluster cannot be started on Windows 
without extensions.", !OperatingSystem.isWindows());
+       }
+
+       @BeforeClass
+       public static void createHDFS() throws Exception {
+               final File baseDir = TEMP_FOLDER.newFolder();
+
+               final Configuration hdConf = new Configuration();
+               hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+
+               final MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf)
+                               
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(1));
+               hdfsCluster = builder.build();
+               hdfsCluster.waitClusterUp();
+
+               fHdfs = hdfsCluster.getFileSystem(0);
+       }
+
+       @Before
+       public void setUp() throws Exception {
+               fsTarget = fHdfs;
+               targetTestRoot = 
fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
+
+               fsTarget.delete(targetTestRoot, true);
+               fsTarget.mkdirs(targetTestRoot);
+
+               fsViewConf = ViewFileSystemTestSetup.createConfig();
+               setupMountPoints();
+               FileSystem fsView = FileSystem.get(FsConstants.VIEWFS_URI, 
fsViewConf);
+               fSystem = new HadoopFileSystem(fsView);
+       }
+
+       private void setupMountPoints() {
+               Path mountOnNn1 = new Path("/mountOnNn1");
+               ConfigUtil.addLink(fsViewConf, mountOnNn1.toString(), 
targetTestRoot.toUri());
+       }
+
+       @AfterClass
+       public static void shutdownCluster() {
+               hdfsCluster.shutdown();
+       }
+
+       @After
+       public void tearDown() throws Exception {
+               fsTarget.delete(fileSystemTestHelper.getTestRootPath(fsTarget), 
true);
+       }
+
+       @Test
+       public void testViewFileSystemRecoverWorks() throws IOException {
+
+               final org.apache.flink.core.fs.Path testPath = new 
org.apache.flink.core.fs.Path(
+                               fSystem.getUri() + "mountOnNn1/test-1");
+               final String expectedContent = "test_line";
+
+               final RecoverableWriter writer = 
fSystem.createRecoverableWriter();
+               final RecoverableFsDataOutputStream streamUnderTest = 
getOpenStreamToFileWithContent(
+                               writer, testPath, expectedContent);
+
+               final ResumeRecoverable resumeRecover = 
streamUnderTest.persist();
+
+               final RecoverableFsDataOutputStream recover = 
writer.recover(resumeRecover);
+
+               final RecoverableWriter.CommitRecoverable committable = 
recover.closeForCommit().getRecoverable();
+
+               final RecoverableWriter recoveredWriter = 
fSystem.createRecoverableWriter();
+               
recoveredWriter.recoverForCommit(committable).commitAfterRecovery();
+
+               verifyFileContent(testPath, expectedContent);
+       }
+
+       private RecoverableFsDataOutputStream getOpenStreamToFileWithContent(
+                       final RecoverableWriter writerUnderTest,
+                       final org.apache.flink.core.fs.Path path,
+                       final String expectedContent) throws IOException {
+               final byte[] content = expectedContent.getBytes(UTF_8);
+
+               final RecoverableFsDataOutputStream streamUnderTest = 
writerUnderTest.open(path);
+               streamUnderTest.write(content);
+               return streamUnderTest;
+       }
+
+       private static void verifyFileContent(
+                       final org.apache.flink.core.fs.Path testPath,
+                       final String expectedContent) throws IOException {
+               try (
+                               FSDataInputStream in = fSystem.open(testPath);
+                               InputStreamReader ir = new 
InputStreamReader(in, UTF_8);
+                               BufferedReader reader = new BufferedReader(ir)
+               ) {
+                       final String line = reader.readLine();
+                       assertEquals(expectedContent, line);
+               }
+       }
+}

Reply via email to