HADOOP-13953. Make FTPFileSystem's data connection mode and transfer mode 
configurable. Contributed by Xiao Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a212a40
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a212a40
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a212a40

Branch: refs/heads/HADOOP-13345
Commit: 0a212a40fcbd12a11294bff7a31e7433111733c9
Parents: 91bf504
Author: Wei-Chiu Chuang <weic...@apache.org>
Authored: Mon Jan 9 15:18:26 2017 -0800
Committer: Wei-Chiu Chuang <weic...@apache.org>
Committed: Mon Jan 9 15:18:26 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/ftp/FTPFileSystem.java | 70 +++++++++++++++++++-
 .../src/main/resources/core-default.xml         | 18 +++++
 .../conf/TestCommonConfigurationFields.java     |  2 +
 .../apache/hadoop/fs/ftp/TestFTPFileSystem.java | 56 +++++++++++++++-
 4 files changed, 143 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
index 28b07e8..25fec31 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.ConnectException;
 import java.net.URI;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -65,6 +66,9 @@ public class FTPFileSystem extends FileSystem {
   public static final String FS_FTP_HOST = "fs.ftp.host";
   public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port";
   public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password.";
+  public static final String FS_FTP_DATA_CONNECTION_MODE =
+      "fs.ftp.data.connection.mode";
+  public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode";
   public static final String E_SAME_DIRECTORY_ONLY =
       "only same directory renames are supported";
 
@@ -143,9 +147,10 @@ public class FTPFileSystem extends FileSystem {
                    NetUtils.UNKNOWN_HOST, 0,
                    new ConnectException("Server response " + reply));
     } else if (client.login(user, password)) {
-      client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
+      client.setFileTransferMode(getTransferMode(conf));
       client.setFileType(FTP.BINARY_FILE_TYPE);
       client.setBufferSize(DEFAULT_BUFFER_SIZE);
+      setDataConnectionMode(client, conf);
     } else {
       throw new IOException("Login failed on server - " + host + ", port - "
           + port + " as user '" + user + "'");
@@ -155,6 +160,69 @@ public class FTPFileSystem extends FileSystem {
   }
 
   /**
+   * Set FTP's transfer mode based on configuration. Valid values are
+   * STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
+   * <p/>
+   * Defaults to BLOCK_TRANSFER_MODE.
+   *
+   * @param conf
+   * @return
+   */
+  @VisibleForTesting
+  int getTransferMode(Configuration conf) {
+    final String mode = conf.get(FS_FTP_TRANSFER_MODE);
+    // FTP default is STREAM_TRANSFER_MODE, but Hadoop FTPFS's default is
+    // FTP.BLOCK_TRANSFER_MODE historically.
+    int ret = FTP.BLOCK_TRANSFER_MODE;
+    if (mode == null) {
+      return ret;
+    }
+    final String upper = mode.toUpperCase();
+    if (upper.equals("STREAM_TRANSFER_MODE")) {
+      ret = FTP.STREAM_TRANSFER_MODE;
+    } else if (upper.equals("COMPRESSED_TRANSFER_MODE")) {
+      ret = FTP.COMPRESSED_TRANSFER_MODE;
+    } else {
+      if (!upper.equals("BLOCK_TRANSFER_MODE")) {
+        LOG.warn("Cannot parse the value for " + FS_FTP_TRANSFER_MODE + ": "
+            + mode + ". Using default.");
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Set the FTPClient's data connection mode based on configuration. Valid
+   * values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+   * PASSIVE_LOCAL_DATA_CONNECTION_MODE and 
PASSIVE_REMOTE_DATA_CONNECTION_MODE.
+   * <p/>
+   * Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE.
+   *
+   * @param client
+   * @param conf
+   * @throws IOException
+   */
+  @VisibleForTesting
+  void setDataConnectionMode(FTPClient client, Configuration conf)
+      throws IOException {
+    final String mode = conf.get(FS_FTP_DATA_CONNECTION_MODE);
+    if (mode == null) {
+      return;
+    }
+    final String upper = mode.toUpperCase();
+    if (upper.equals("PASSIVE_LOCAL_DATA_CONNECTION_MODE")) {
+      client.enterLocalPassiveMode();
+    } else if (upper.equals("PASSIVE_REMOTE_DATA_CONNECTION_MODE")) {
+      client.enterRemotePassiveMode();
+    } else {
+      if (!upper.equals("ACTIVE_LOCAL_DATA_CONNECTION_MODE")) {
+        LOG.warn("Cannot parse the value for " + FS_FTP_DATA_CONNECTION_MODE
+            + ": " + mode + ". Using default.");
+      }
+    }
+  }
+
+  /**
    * Logout and disconnect the given FTPClient. *
    * 
    * @param client

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index ee2cc2e..9355a3c 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -783,6 +783,24 @@
 </property>
 
 <property>
+  <name>fs.ftp.data.connection.mode</name>
+  <value>ACTIVE_LOCAL_DATA_CONNECTION_MODE</value>
+  <description>Set the FTPClient's data connection mode based on configuration.
+    Valid values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+    PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE.
+  </description>
+</property>
+
+<property>
+  <name>fs.ftp.transfer.mode</name>
+  <value>BLOCK_TRANSFER_MODE</value>
+  <description>
+    Set FTP's transfer mode based on configuration. Valid values are
+    STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
+  </description>
+</property>
+
+<property>
   <name>fs.df.interval</name>
   <value>60000</value>
   <description>Disk usage statistics refresh interval in msec.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index a3a4026..966a8ac 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -89,6 +89,8 @@ public class TestCommonConfigurationFields extends 
TestConfigurationFieldsBase {
     // Lots of properties not in the above classes
     xmlPropsToSkipCompare.add("fs.ftp.password.localhost");
     xmlPropsToSkipCompare.add("fs.ftp.user.localhost");
+    xmlPropsToSkipCompare.add("fs.ftp.data.connection.mode");
+    xmlPropsToSkipCompare.add("fs.ftp.transfer.mode");
     xmlPropsToSkipCompare.add("hadoop.tmp.dir");
     xmlPropsToSkipCompare.add("nfs3.mountd.port");
     xmlPropsToSkipCompare.add("nfs3.server.port");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
index 0ce2a9b..0604604 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java
@@ -19,15 +19,67 @@ package org.apache.hadoop.fs.ftp;
 
 import org.apache.commons.net.ftp.FTP;
 
-import org.junit.Assert;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.Timeout;
 
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test basic @{link FTPFileSystem} class methods. Contract tests are in
+ * TestFTPContractXXXX.
+ */
 public class TestFTPFileSystem {
 
+  @Rule
+  public Timeout testTimeout = new Timeout(180000);
+
   @Test
   public void testFTPDefaultPort() throws Exception {
     FTPFileSystem ftp = new FTPFileSystem();
-    Assert.assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
+    assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort());
   }
 
+  @Test
+  public void testFTPTransferMode() throws Exception {
+    Configuration conf = new Configuration();
+    FTPFileSystem ftp = new FTPFileSystem();
+    assertEquals(FTP.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
+
+    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "STREAM_TRANSFER_MODE");
+    assertEquals(FTP.STREAM_TRANSFER_MODE, ftp.getTransferMode(conf));
+
+    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "COMPRESSED_TRANSFER_MODE");
+    assertEquals(FTP.COMPRESSED_TRANSFER_MODE, ftp.getTransferMode(conf));
+
+    conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "invalid");
+    assertEquals(FTPClient.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf));
+  }
+
+  @Test
+  public void testFTPDataConnectionMode() throws Exception {
+    Configuration conf = new Configuration();
+    FTPClient client = new FTPClient();
+    FTPFileSystem ftp = new FTPFileSystem();
+    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+    ftp.setDataConnectionMode(client, conf);
+    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+    conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, "invalid");
+    ftp.setDataConnectionMode(client, conf);
+    assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+    conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE,
+        "PASSIVE_LOCAL_DATA_CONNECTION_MODE");
+    ftp.setDataConnectionMode(client, conf);
+    assertEquals(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE,
+        client.getDataConnectionMode());
+
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to