Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4939#discussion_r149071810
  
    --- Diff: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnFileStageTest.java ---
    @@ -0,0 +1,215 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import org.apache.flink.util.OperatingSystem;
    +
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.LocatedFileStatus;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.fs.RemoteIterator;
    +import org.apache.hadoop.hdfs.MiniDFSCluster;
    +import org.apache.hadoop.yarn.api.records.ApplicationId;
    +import org.apache.hadoop.yarn.api.records.LocalResource;
    +import org.apache.hadoop.yarn.util.ConverterUtils;
    +import org.junit.AfterClass;
    +import org.junit.Assume;
    +import org.junit.Before;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.io.DataOutputStream;
    +import java.io.File;
    +import java.io.FileOutputStream;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import static org.hamcrest.MatcherAssert.assertThat;
    +import static org.hamcrest.Matchers.equalTo;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertFalse;
    +
    +/**
    + * Tests for verifying file staging during submission to YARN works.
    + */
    +public class YarnFileStageTest {
    +
    +   @ClassRule
    +   public static final TemporaryFolder CLASS_TEMP_DIR = new 
TemporaryFolder();
    +
    +   @Rule
    +   public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +   private static MiniDFSCluster hdfsCluster;
    +
    +   private static Path hdfsRootPath;
    +
    +   private org.apache.hadoop.conf.Configuration hadoopConfig;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Test setup and shutdown
    +   // 
------------------------------------------------------------------------
    +
    +   @BeforeClass
    +   public static void createHDFS() throws Exception {
    +           Assume.assumeTrue(!OperatingSystem.isWindows());
    +
    +           final File tempDir = CLASS_TEMP_DIR.newFolder();
    +
    +           org.apache.hadoop.conf.Configuration hdConf = new 
org.apache.hadoop.conf.Configuration();
    +           hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
tempDir.getAbsolutePath());
    +
    +           MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
    +           hdfsCluster = builder.build();
    +           hdfsRootPath = new Path(hdfsCluster.getURI());
    +   }
    +
    +   @AfterClass
    +   public static void destroyHDFS() {
    +           if (hdfsCluster != null) {
    +                   hdfsCluster.shutdown();
    +           }
    +           hdfsCluster = null;
    +           hdfsRootPath = null;
    +   }
    +
    +   @Before
    +   public void initConfig() {
    +           hadoopConfig = new org.apache.hadoop.conf.Configuration();
    +           
hadoopConfig.set(org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY, 
hdfsRootPath.toString());
    +   }
    +
    +   /**
    +    * Verifies that nested directories are properly copied with a 
<tt>hdfs://</tt> file
    +    * system (from a <tt>file:///absolute/path</tt> source path).
    +    */
    +   @Test
    +   public void testCopyFromLocalRecursiveWithScheme() throws Exception {
    +           final FileSystem targetFileSystem = 
hdfsRootPath.getFileSystem(hadoopConfig);
    +           final Path targetDir = targetFileSystem.getWorkingDirectory();
    +
    +           testCopyFromLocalRecursive(targetFileSystem, targetDir, 
tempFolder, true);
    +   }
    +
    +   /**
    +    * Verifies that nested directories are properly copied with a 
<tt>hdfs://</tt> file
    +    * system (from a <tt>/absolute/path</tt> source path).
    +    */
    +   @Test
    +   public void testCopyFromLocalRecursiveWithoutScheme() throws Exception {
    +           final FileSystem targetFileSystem = 
hdfsRootPath.getFileSystem(hadoopConfig);
    +           final Path targetDir = targetFileSystem.getWorkingDirectory();
    +
    +           testCopyFromLocalRecursive(targetFileSystem, targetDir, 
tempFolder, false);
    +   }
    +
    +   /**
    +    * Verifies that nested directories are properly copied with the given 
filesystem and paths.
    +    *
    +    * @param targetFileSystem
    +    *              file system of the target path
    +    * @param targetDir
    +    *              target path (URI like <tt>hdfs://...</tt>)
    +    * @param tempFolder
    +    *              JUnit temporary folder rule to create the source 
directory with
    +    * @param addSchemeToLocalPath
    +    *              whether add the <tt>file://</tt> scheme to the local 
path to copy from
    +    */
    +   public static void testCopyFromLocalRecursive(
    +                   FileSystem targetFileSystem, Path targetDir, 
TemporaryFolder tempFolder,
    +                   boolean addSchemeToLocalPath) throws Exception {
    --- End diff --
    
    nit: line breaks inconsistent.


---

Reply via email to