Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1687#discussion_r53628814
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
    @@ -26,32 +25,46 @@
     import java.io.File;
     import java.io.FileInputStream;
     import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.List;
     
     /**
    - * Utility for copying from local file system to a HDFS {@link FileSystem} 
in an external process.
    - * This is required since {@code FileSystem.copyFromLocalFile} does not 
like being interrupted.
    + * Utility for copying from local file system to a HDFS {@link FileSystem}.
      */
     public class HDFSCopyFromLocal {
    -   public static void main(String[] args) throws Exception {
    -           String hadoopConfPath = args[0];
    -           String localBackupPath = args[1];
    -           String backupUri = args[2];
    -
    -           Configuration hadoopConf = new Configuration();
    -           try (DataInputStream in = new DataInputStream(new 
FileInputStream(hadoopConfPath))) {
    -                   hadoopConf.readFields(in);
    -           }
     
    -           FileSystem fs = FileSystem.get(new URI(backupUri), hadoopConf);
    +   public static void copyFromLocal(final File hadoopConfPath, final File 
localPath, final URI remotePath) throws Exception {
    +           // Do it in another Thread because HDFS can deadlock if being 
interrupted while copying
     
    -           fs.copyFromLocalFile(new Path(localBackupPath), new 
Path(backupUri));
    -   }
    +           String threadName = "HDFS Copy from " + localPath + " to " + 
remotePath;
    +
    +           final List<Exception> asyncException = new ArrayList<>();
    +
    +           Thread copyThread = new Thread(threadName) {
    +                   @Override
    +                   public void run() {
    +                           try {
    +                                   Configuration hadoopConf = new 
Configuration();
    --- End diff --
    
    This is only loading the Hadoop configuration from the classpath, not from 
the Flink configuration or environment variables
    
    I think our filesystem code has a method to try the environment variables 
and the config as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to