HADOOP-13953. Make FTPFileSystem's data connection mode and transfer mode configurable. Contributed by Xiao Chen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0a212a40 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0a212a40 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0a212a40 Branch: refs/heads/HADOOP-13345 Commit: 0a212a40fcbd12a11294bff7a31e7433111733c9 Parents: 91bf504 Author: Wei-Chiu Chuang <weic...@apache.org> Authored: Mon Jan 9 15:18:26 2017 -0800 Committer: Wei-Chiu Chuang <weic...@apache.org> Committed: Mon Jan 9 15:18:26 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/fs/ftp/FTPFileSystem.java | 70 +++++++++++++++++++- .../src/main/resources/core-default.xml | 18 +++++ .../conf/TestCommonConfigurationFields.java | 2 + .../apache/hadoop/fs/ftp/TestFTPFileSystem.java | 56 +++++++++++++++- 4 files changed, 143 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java index 28b07e8..25fec31 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.net.ConnectException; import java.net.URI; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,9 @@ public class FTPFileSystem extends FileSystem { public static final String FS_FTP_HOST = "fs.ftp.host"; public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; + public static final String FS_FTP_DATA_CONNECTION_MODE = + "fs.ftp.data.connection.mode"; + public static final String FS_FTP_TRANSFER_MODE = "fs.ftp.transfer.mode"; public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported"; @@ -143,9 +147,10 @@ public class FTPFileSystem extends FileSystem { NetUtils.UNKNOWN_HOST, 0, new ConnectException("Server response " + reply)); } else if (client.login(user, password)) { - client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE); + client.setFileTransferMode(getTransferMode(conf)); client.setFileType(FTP.BINARY_FILE_TYPE); client.setBufferSize(DEFAULT_BUFFER_SIZE); + setDataConnectionMode(client, conf); } else { throw new IOException("Login failed on server - " + host + ", port - " + port + " as user '" + user + "'"); @@ -155,6 +160,69 @@ public class FTPFileSystem extends FileSystem { } /** + * Set FTP's transfer mode based on configuration. Valid values are + * STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE. + * <p/> + * Defaults to BLOCK_TRANSFER_MODE. + * + * @param conf + * @return + */ + @VisibleForTesting + int getTransferMode(Configuration conf) { + final String mode = conf.get(FS_FTP_TRANSFER_MODE); + // FTP default is STREAM_TRANSFER_MODE, but Hadoop FTPFS's default is + // FTP.BLOCK_TRANSFER_MODE historically. + int ret = FTP.BLOCK_TRANSFER_MODE; + if (mode == null) { + return ret; + } + final String upper = mode.toUpperCase(); + if (upper.equals("STREAM_TRANSFER_MODE")) { + ret = FTP.STREAM_TRANSFER_MODE; + } else if (upper.equals("COMPRESSED_TRANSFER_MODE")) { + ret = FTP.COMPRESSED_TRANSFER_MODE; + } else { + if (!upper.equals("BLOCK_TRANSFER_MODE")) { + LOG.warn("Cannot parse the value for " + FS_FTP_TRANSFER_MODE + ": " + + mode + ". Using default."); + } + } + return ret; + } + + /** + * Set the FTPClient's data connection mode based on configuration. Valid + * values are ACTIVE_LOCAL_DATA_CONNECTION_MODE, + * PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE. + * <p/> + * Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE. + * + * @param client + * @param conf + * @throws IOException + */ + @VisibleForTesting + void setDataConnectionMode(FTPClient client, Configuration conf) + throws IOException { + final String mode = conf.get(FS_FTP_DATA_CONNECTION_MODE); + if (mode == null) { + return; + } + final String upper = mode.toUpperCase(); + if (upper.equals("PASSIVE_LOCAL_DATA_CONNECTION_MODE")) { + client.enterLocalPassiveMode(); + } else if (upper.equals("PASSIVE_REMOTE_DATA_CONNECTION_MODE")) { + client.enterRemotePassiveMode(); + } else { + if (!upper.equals("ACTIVE_LOCAL_DATA_CONNECTION_MODE")) { + LOG.warn("Cannot parse the value for " + FS_FTP_DATA_CONNECTION_MODE + + ": " + mode + ". Using default."); + } + } + } + + /** * Logout and disconnect the given FTPClient. * * * @param client http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index ee2cc2e..9355a3c 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -783,6 +783,24 @@ </property> <property> + <name>fs.ftp.data.connection.mode</name> + <value>ACTIVE_LOCAL_DATA_CONNECTION_MODE</value> + <description>Set the FTPClient's data connection mode based on configuration. + Valid values are ACTIVE_LOCAL_DATA_CONNECTION_MODE, + PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE. + </description> +</property> + +<property> + <name>fs.ftp.transfer.mode</name> + <value>BLOCK_TRANSFER_MODE</value> + <description> + Set FTP's transfer mode based on configuration. Valid values are + STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE. + </description> +</property> + +<property> <name>fs.df.interval</name> <value>60000</value> <description>Disk usage statistics refresh interval in msec.</description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index a3a4026..966a8ac 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -89,6 +89,8 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase { // Lots of properties not in the above classes xmlPropsToSkipCompare.add("fs.ftp.password.localhost"); xmlPropsToSkipCompare.add("fs.ftp.user.localhost"); + xmlPropsToSkipCompare.add("fs.ftp.data.connection.mode"); + xmlPropsToSkipCompare.add("fs.ftp.transfer.mode"); xmlPropsToSkipCompare.add("hadoop.tmp.dir"); xmlPropsToSkipCompare.add("nfs3.mountd.port"); xmlPropsToSkipCompare.add("nfs3.server.port"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0a212a40/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java index 0ce2a9b..0604604 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/ftp/TestFTPFileSystem.java @@ -19,15 +19,67 @@ package org.apache.hadoop.fs.ftp; import org.apache.commons.net.ftp.FTP; -import org.junit.Assert; +import org.apache.commons.net.ftp.FTPClient; +import org.apache.hadoop.conf.Configuration; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; +import static org.junit.Assert.assertEquals; + +/** + * Test basic @{link FTPFileSystem} class methods. Contract tests are in + * TestFTPContractXXXX. + */ public class TestFTPFileSystem { + @Rule + public Timeout testTimeout = new Timeout(180000); + @Test public void testFTPDefaultPort() throws Exception { FTPFileSystem ftp = new FTPFileSystem(); - Assert.assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort()); + assertEquals(FTP.DEFAULT_PORT, ftp.getDefaultPort()); } + @Test + public void testFTPTransferMode() throws Exception { + Configuration conf = new Configuration(); + FTPFileSystem ftp = new FTPFileSystem(); + assertEquals(FTP.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf)); + + conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "STREAM_TRANSFER_MODE"); + assertEquals(FTP.STREAM_TRANSFER_MODE, ftp.getTransferMode(conf)); + + conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "COMPRESSED_TRANSFER_MODE"); + assertEquals(FTP.COMPRESSED_TRANSFER_MODE, ftp.getTransferMode(conf)); + + conf.set(FTPFileSystem.FS_FTP_TRANSFER_MODE, "invalid"); + assertEquals(FTPClient.BLOCK_TRANSFER_MODE, ftp.getTransferMode(conf)); + } + + @Test + public void testFTPDataConnectionMode() throws Exception { + Configuration conf = new Configuration(); + FTPClient client = new FTPClient(); + FTPFileSystem ftp = new FTPFileSystem(); + assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + ftp.setDataConnectionMode(client, conf); + assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, "invalid"); + ftp.setDataConnectionMode(client, conf); + assertEquals(FTPClient.ACTIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + conf.set(FTPFileSystem.FS_FTP_DATA_CONNECTION_MODE, + "PASSIVE_LOCAL_DATA_CONNECTION_MODE"); + ftp.setDataConnectionMode(client, conf); + assertEquals(FTPClient.PASSIVE_LOCAL_DATA_CONNECTION_MODE, + client.getDataConnectionMode()); + + } } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org