This is an automated email from the ASF dual-hosted git repository. liuml07 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 56ebabd HADOOP-17222. Create socket address leveraging URI cache (#2241) 56ebabd is described below commit 56ebabd426757dd95c778535548abb8c01fbc1fb Author: 1996fanrui <1996fan...@gmail.com> AuthorDate: Fri Sep 11 13:30:52 2020 +0800 HADOOP-17222. Create socket address leveraging URI cache (#2241) Contributed by fanrui. Signed-off-by: Mingliang Liu <lium...@apache.org> Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org> --- .../main/java/org/apache/hadoop/net/NetUtils.java | 78 ++++++++++++++++++---- .../java/org/apache/hadoop/net/TestNetUtils.java | 45 ++++++++++++- .../apache/hadoop/security/TestSecurityUtil.java | 10 +++ .../org/apache/hadoop/hdfs/DFSInputStream.java | 4 +- .../hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 + .../hadoop/hdfs/client/impl/DfsClientConf.java | 52 ++++++++++----- .../src/main/resources/hdfs-default.xml | 9 +++ .../apache/hadoop/tools/TestHdfsConfigFields.java | 1 + 8 files changed, 169 insertions(+), 33 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java index c5a5b11..004fa1c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java @@ -39,12 +39,16 @@ import java.net.ConnectException; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import javax.net.SocketFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + import org.apache.commons.net.util.SubnetUtils; import org.apache.commons.net.util.SubnetUtils.SubnetInfo; import org.apache.hadoop.classification.InterfaceAudience; @@ -177,11 +181,33 @@ public class NetUtils { * include a port number * @param configName the name of the configuration from which * <code>target</code> was loaded. This is used in the - * exception message in the case that parsing fails. + * exception message in the case that parsing fails. */ public static InetSocketAddress createSocketAddr(String target, int defaultPort, String configName) { + return createSocketAddr(target, defaultPort, configName, false); + } + + /** + * Create an InetSocketAddress from the given target string and + * default port. If the string cannot be parsed correctly, the + * <code>configName</code> parameter is used as part of the + * exception message, allowing the user to better diagnose + * the misconfiguration. + * + * @param target a string of either "host" or "host:port" + * @param defaultPort the default port if <code>target</code> does not + * include a port number + * @param configName the name of the configuration from which + * <code>target</code> was loaded. This is used in the + * exception message in the case that parsing fails. + * @param useCacheIfPresent Whether use cache when create URI + */ + public static InetSocketAddress createSocketAddr(String target, + int defaultPort, + String configName, + boolean useCacheIfPresent) { String helpText = ""; if (configName != null) { helpText = " (configuration property '" + configName + "')"; @@ -191,15 +217,8 @@ public class NetUtils { helpText); } target = target.trim(); - boolean hasScheme = target.contains("://"); - URI uri = null; - try { - uri = hasScheme ? URI.create(target) : URI.create("dummyscheme://"+target); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException( - "Does not contain a valid host:port authority: " + target + helpText - ); - } + boolean hasScheme = target.contains("://"); + URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent); String host = uri.getHost(); int port = uri.getPort(); @@ -207,10 +226,9 @@ public class NetUtils { port = defaultPort; } String path = uri.getPath(); - + if ((host == null) || (port < 0) || - (!hasScheme && path != null && !path.isEmpty())) - { + (!hasScheme && path != null && !path.isEmpty())) { throw new IllegalArgumentException( "Does not contain a valid host:port authority: " + target + helpText ); @@ -218,6 +236,40 @@ public class NetUtils { return createSocketAddrForHost(host, port); } + private static final long URI_CACHE_SIZE_DEFAULT = 1000; + private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12; + private static final Cache<String, URI> URI_CACHE = CacheBuilder.newBuilder() + .maximumSize(URI_CACHE_SIZE_DEFAULT) + .expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS) + .build(); + + private static URI createURI(String target, + boolean hasScheme, + String helpText, + boolean useCacheIfPresent) { + URI uri; + if (useCacheIfPresent) { + uri = URI_CACHE.getIfPresent(target); + if (uri != null) { + return uri; + } + } + + try { + uri = hasScheme ? URI.create(target) : + URI.create("dummyscheme://" + target); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Does not contain a valid host:port authority: " + target + helpText + ); + } + + if (useCacheIfPresent) { + URI_CACHE.put(target, uri); + } + return uri; + } + /** * Create a socket address with the given host and port. The hostname * might be replaced with another host that was set via diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java index 7628493..cfffd85 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java @@ -352,7 +352,7 @@ public class TestNetUtils { assertEquals(1000, addr.getPort()); try { - addr = NetUtils.createSocketAddr( + NetUtils.createSocketAddr( "127.0.0.1:blahblah", 1000, "myconfig"); fail("Should have failed to parse bad port"); } catch (IllegalArgumentException iae) { @@ -360,6 +360,49 @@ public class TestNetUtils { } } + @Test + public void testCreateSocketAddressWithURICache() throws Throwable { + InetSocketAddress addr = NetUtils.createSocketAddr( + "127.0.0.1:12345", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(12345, addr.getPort()); + + addr = NetUtils.createSocketAddr( + "127.0.0.1:12345", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(12345, addr.getPort()); + + // ---------------------------------------------------- + + addr = NetUtils.createSocketAddr( + "127.0.0.1", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(1000, addr.getPort()); + + addr = NetUtils.createSocketAddr( + "127.0.0.1", 1000, "myconfig", true); + assertEquals("127.0.0.1", addr.getAddress().getHostAddress()); + assertEquals(1000, addr.getPort()); + + // ---------------------------------------------------- + + try { + NetUtils.createSocketAddr( + "127.0.0.1:blahblah", 1000, "myconfig", true); + fail("Should have failed to parse bad port"); + } catch (IllegalArgumentException iae) { + assertInException(iae, "myconfig"); + } + + try { + NetUtils.createSocketAddr( + "127.0.0.1:blahblah", 1000, "myconfig", true); + fail("Should have failed to parse bad port"); + } catch (IllegalArgumentException iae) { + assertInException(iae, "myconfig"); + } + } + private void assertRemoteDetailsIncluded(IOException wrapped) throws Throwable { assertInException(wrapped, "desthost"); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java index 016c589..2e1ca66 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java @@ -370,6 +370,16 @@ public class TestSecurityUtil { verifyServiceAddr(staticHost, "255.255.255.255"); } + @Test + public void testSocketAddrWithChangeIP() { + String staticHost = "host4"; + NetUtils.addStaticResolution(staticHost, "255.255.255.255"); + verifyServiceAddr(staticHost, "255.255.255.255"); + + NetUtils.addStaticResolution(staticHost, "255.255.255.254"); + verifyServiceAddr(staticHost, "255.255.255.254"); + } + // this is a bizarre case, but it's if a test tries to remap an ip address @Test public void testSocketAddrWithIPToStaticIP() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 0676cf9..402c382 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -1086,7 +1086,9 @@ public class DFSInputStream extends FSInputStream final String dnAddr = chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); DFSClient.LOG.debug("Connecting to datanode {}", dnAddr); - InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); + boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled(); + InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr, + -1, null, uriCacheEnabled); return new DNAddrPair(chosenNode, targetAddr, storageType, block); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 8561cab..0e4cebf 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -417,6 +417,9 @@ public interface HdfsClientConfigKeys { String PREFETCH_SIZE_KEY = PREFIX + "prefetch.size"; + String URI_CACHE_KEY = PREFIX + "uri.cache.enabled"; + boolean URI_CACHE_DEFAULT = false; + interface ShortCircuit { String PREFIX = Read.PREFIX + "shortcircuit."; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index e41b608..07fe80c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -129,6 +129,7 @@ public class DfsClientConf { private final int blockWriteLocateFollowingMaxDelayMs; private final long defaultBlockSize; private final long prefetchSize; + private final boolean uriCacheEnabled; private final short defaultReplication; private final String taskId; private final FsPermission uMask; @@ -211,24 +212,7 @@ public class DfsClientConf { Write.MAX_PACKETS_IN_FLIGHT_KEY, Write.MAX_PACKETS_IN_FLIGHT_DEFAULT); - final boolean byteArrayManagerEnabled = conf.getBoolean( - Write.ByteArrayManager.ENABLED_KEY, - Write.ByteArrayManager.ENABLED_DEFAULT); - if (!byteArrayManagerEnabled) { - writeByteArrayManagerConf = null; - } else { - final int countThreshold = conf.getInt( - Write.ByteArrayManager.COUNT_THRESHOLD_KEY, - Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); - final int countLimit = conf.getInt( - Write.ByteArrayManager.COUNT_LIMIT_KEY, - Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); - final long countResetTimePeriodMs = conf.getLong( - Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, - Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); - writeByteArrayManagerConf = new ByteArrayManager.Conf( - countThreshold, countLimit, countResetTimePeriodMs); - } + writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf); defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT); @@ -240,6 +224,10 @@ public class DfsClientConf { Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT); prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY, 10 * defaultBlockSize); + + uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY, + Read.URI_CACHE_DEFAULT); + numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY, DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT); numBlockWriteRetry = conf.getInt( @@ -308,6 +296,27 @@ public class DfsClientConf { "can't be more then 5."); } + private ByteArrayManager.Conf loadWriteByteArrayManagerConf( + Configuration conf) { + final boolean byteArrayManagerEnabled = conf.getBoolean( + Write.ByteArrayManager.ENABLED_KEY, + Write.ByteArrayManager.ENABLED_DEFAULT); + if (!byteArrayManagerEnabled) { + return null; + } + final int countThreshold = conf.getInt( + Write.ByteArrayManager.COUNT_THRESHOLD_KEY, + Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT); + final int countLimit = conf.getInt( + Write.ByteArrayManager.COUNT_LIMIT_KEY, + Write.ByteArrayManager.COUNT_LIMIT_DEFAULT); + final long countResetTimePeriodMs = conf.getLong( + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY, + Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT); + return new ByteArrayManager.Conf( + countThreshold, countLimit, countResetTimePeriodMs); + } + @SuppressWarnings("unchecked") private List<Class<? extends ReplicaAccessorBuilder>> loadReplicaAccessorBuilderClasses(Configuration conf) { @@ -556,6 +565,13 @@ public class DfsClientConf { } /** + * @return the uriCacheEnable + */ + public boolean isUriCacheEnabled() { + return uriCacheEnabled; + } + + /** * @return the defaultReplication */ public short getDefaultReplication() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index db152e6..4a0da7a 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4186,6 +4186,15 @@ </property> <property> + <name>dfs.client.read.uri.cache.enabled</name> + <value>false</value> + <description> + If true, dfs client will use cache when creating URI based on host:port + to reduce the frequency of URI object creation. + </description> +</property> + +<property> <name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name> <value>1800000</value> <description> diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 63c6233..9d6e589 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -44,6 +44,7 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { HdfsClientConfigKeys.Failover.class, HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class, HdfsClientConfigKeys.BlockWrite.class, + HdfsClientConfigKeys.Read.class, HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class }; // Set error modes --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org