Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/1687#discussion_r53628814 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java --- @@ -26,32 +25,46 @@ import java.io.File; import java.io.FileInputStream; import java.net.URI; +import java.util.ArrayList; +import java.util.List; /** - * Utility for copying from local file system to a HDFS {@link FileSystem} in an external process. - * This is required since {@code FileSystem.copyFromLocalFile} does not like being interrupted. + * Utility for copying from local file system to a HDFS {@link FileSystem}. */ public class HDFSCopyFromLocal { - public static void main(String[] args) throws Exception { - String hadoopConfPath = args[0]; - String localBackupPath = args[1]; - String backupUri = args[2]; - - Configuration hadoopConf = new Configuration(); - try (DataInputStream in = new DataInputStream(new FileInputStream(hadoopConfPath))) { - hadoopConf.readFields(in); - } - FileSystem fs = FileSystem.get(new URI(backupUri), hadoopConf); + public static void copyFromLocal(final File hadoopConfPath, final File localPath, final URI remotePath) throws Exception { + // Do it in another Thread because HDFS can deadlock if being interrupted while copying - fs.copyFromLocalFile(new Path(localBackupPath), new Path(backupUri)); - } + String threadName = "HDFS Copy from " + localPath + " to " + remotePath; + + final List<Exception> asyncException = new ArrayList<>(); + + Thread copyThread = new Thread(threadName) { + @Override + public void run() { + try { + Configuration hadoopConf = new Configuration(); --- End diff -- This is only loading the Hadoop configuration from the classpath, not from the Flink configuration or environment variables I think our filesystem code has a method to try the environment variables and the config as well.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---