This is an automated email from the ASF dual-hosted git repository.

liuml07 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56ebabd  HADOOP-17222. Create socket address leveraging URI cache 
(#2241)
56ebabd is described below

commit 56ebabd426757dd95c778535548abb8c01fbc1fb
Author: 1996fanrui <1996fan...@gmail.com>
AuthorDate: Fri Sep 11 13:30:52 2020 +0800

    HADOOP-17222. Create socket address leveraging URI cache (#2241)
    
    Contributed by fanrui.
    
    Signed-off-by: Mingliang Liu <lium...@apache.org>
    Signed-off-by: He Xiaoqiao <hexiaoq...@apache.org>
---
 .../main/java/org/apache/hadoop/net/NetUtils.java  | 78 ++++++++++++++++++----
 .../java/org/apache/hadoop/net/TestNetUtils.java   | 45 ++++++++++++-
 .../apache/hadoop/security/TestSecurityUtil.java   | 10 +++
 .../org/apache/hadoop/hdfs/DFSInputStream.java     |  4 +-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  3 +
 .../hadoop/hdfs/client/impl/DfsClientConf.java     | 52 ++++++++++-----
 .../src/main/resources/hdfs-default.xml            |  9 +++
 .../apache/hadoop/tools/TestHdfsConfigFields.java  |  1 +
 8 files changed, 169 insertions(+), 33 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
index c5a5b11..004fa1c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetUtils.java
@@ -39,12 +39,16 @@ import java.net.ConnectException;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.net.SocketFactory;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
 import org.apache.commons.net.util.SubnetUtils;
 import org.apache.commons.net.util.SubnetUtils.SubnetInfo;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -177,11 +181,33 @@ public class NetUtils {
    *                    include a port number
    * @param configName the name of the configuration from which
    *                   <code>target</code> was loaded. This is used in the
-   *                   exception message in the case that parsing fails. 
+   *                   exception message in the case that parsing fails.
    */
   public static InetSocketAddress createSocketAddr(String target,
                                                    int defaultPort,
                                                    String configName) {
+    return createSocketAddr(target, defaultPort, configName, false);
+  }
+
+  /**
+   * Create an InetSocketAddress from the given target string and
+   * default port. If the string cannot be parsed correctly, the
+   * <code>configName</code> parameter is used as part of the
+   * exception message, allowing the user to better diagnose
+   * the misconfiguration.
+   *
+   * @param target a string of either "host" or "host:port"
+   * @param defaultPort the default port if <code>target</code> does not
+   *                    include a port number
+   * @param configName the name of the configuration from which
+   *                   <code>target</code> was loaded. This is used in the
+   *                   exception message in the case that parsing fails.
+   * @param useCacheIfPresent Whether use cache when create URI
+   */
+  public static InetSocketAddress createSocketAddr(String target,
+                                                   int defaultPort,
+                                                   String configName,
+                                                   boolean useCacheIfPresent) {
     String helpText = "";
     if (configName != null) {
       helpText = " (configuration property '" + configName + "')";
@@ -191,15 +217,8 @@ public class NetUtils {
           helpText);
     }
     target = target.trim();
-    boolean hasScheme = target.contains("://");    
-    URI uri = null;
-    try {
-      uri = hasScheme ? URI.create(target) : 
URI.create("dummyscheme://"+target);
-    } catch (IllegalArgumentException e) {
-      throw new IllegalArgumentException(
-          "Does not contain a valid host:port authority: " + target + helpText
-      );
-    }
+    boolean hasScheme = target.contains("://");
+    URI uri = createURI(target, hasScheme, helpText, useCacheIfPresent);
 
     String host = uri.getHost();
     int port = uri.getPort();
@@ -207,10 +226,9 @@ public class NetUtils {
       port = defaultPort;
     }
     String path = uri.getPath();
-    
+
     if ((host == null) || (port < 0) ||
-        (!hasScheme && path != null && !path.isEmpty()))
-    {
+        (!hasScheme && path != null && !path.isEmpty())) {
       throw new IllegalArgumentException(
           "Does not contain a valid host:port authority: " + target + helpText
       );
@@ -218,6 +236,40 @@ public class NetUtils {
     return createSocketAddrForHost(host, port);
   }
 
+  private static final long URI_CACHE_SIZE_DEFAULT = 1000;
+  private static final long URI_CACHE_EXPIRE_TIME_DEFAULT = 12;
+  private static final Cache<String, URI> URI_CACHE = CacheBuilder.newBuilder()
+      .maximumSize(URI_CACHE_SIZE_DEFAULT)
+      .expireAfterWrite(URI_CACHE_EXPIRE_TIME_DEFAULT, TimeUnit.HOURS)
+      .build();
+
+  private static URI createURI(String target,
+                               boolean hasScheme,
+                               String helpText,
+                               boolean useCacheIfPresent) {
+    URI uri;
+    if (useCacheIfPresent) {
+      uri = URI_CACHE.getIfPresent(target);
+      if (uri != null) {
+        return uri;
+      }
+    }
+
+    try {
+      uri = hasScheme ? URI.create(target) :
+              URI.create("dummyscheme://" + target);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException(
+          "Does not contain a valid host:port authority: " + target + helpText
+      );
+    }
+
+    if (useCacheIfPresent) {
+      URI_CACHE.put(target, uri);
+    }
+    return uri;
+  }
+
   /**
    * Create a socket address with the given host and port.  The hostname
    * might be replaced with another host that was set via
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
index 7628493..cfffd85 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestNetUtils.java
@@ -352,7 +352,7 @@ public class TestNetUtils {
     assertEquals(1000, addr.getPort());
 
     try {
-      addr = NetUtils.createSocketAddr(
+      NetUtils.createSocketAddr(
           "127.0.0.1:blahblah", 1000, "myconfig");
       fail("Should have failed to parse bad port");
     } catch (IllegalArgumentException iae) {
@@ -360,6 +360,49 @@ public class TestNetUtils {
     }
   }
 
+  @Test
+  public void testCreateSocketAddressWithURICache() throws Throwable {
+    InetSocketAddress addr = NetUtils.createSocketAddr(
+        "127.0.0.1:12345", 1000, "myconfig", true);
+    assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
+    assertEquals(12345, addr.getPort());
+
+    addr = NetUtils.createSocketAddr(
+        "127.0.0.1:12345", 1000, "myconfig", true);
+    assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
+    assertEquals(12345, addr.getPort());
+
+    // ----------------------------------------------------
+
+    addr = NetUtils.createSocketAddr(
+        "127.0.0.1", 1000, "myconfig", true);
+    assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
+    assertEquals(1000, addr.getPort());
+
+    addr = NetUtils.createSocketAddr(
+        "127.0.0.1", 1000, "myconfig", true);
+    assertEquals("127.0.0.1", addr.getAddress().getHostAddress());
+    assertEquals(1000, addr.getPort());
+
+    // ----------------------------------------------------
+
+    try {
+      NetUtils.createSocketAddr(
+          "127.0.0.1:blahblah", 1000, "myconfig", true);
+      fail("Should have failed to parse bad port");
+    } catch (IllegalArgumentException iae) {
+      assertInException(iae, "myconfig");
+    }
+
+    try {
+      NetUtils.createSocketAddr(
+          "127.0.0.1:blahblah", 1000, "myconfig", true);
+      fail("Should have failed to parse bad port");
+    } catch (IllegalArgumentException iae) {
+      assertInException(iae, "myconfig");
+    }
+  }
+
   private void assertRemoteDetailsIncluded(IOException wrapped)
       throws Throwable {
     assertInException(wrapped, "desthost");
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
index 016c589..2e1ca66 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestSecurityUtil.java
@@ -370,6 +370,16 @@ public class TestSecurityUtil {
     verifyServiceAddr(staticHost, "255.255.255.255");
   }
 
+  @Test
+  public void testSocketAddrWithChangeIP() {
+    String staticHost = "host4";
+    NetUtils.addStaticResolution(staticHost, "255.255.255.255");
+    verifyServiceAddr(staticHost, "255.255.255.255");
+
+    NetUtils.addStaticResolution(staticHost, "255.255.255.254");
+    verifyServiceAddr(staticHost, "255.255.255.254");
+  }
+
   // this is a bizarre case, but it's if a test tries to remap an ip address
   @Test
   public void testSocketAddrWithIPToStaticIP() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 0676cf9..402c382 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -1086,7 +1086,9 @@ public class DFSInputStream extends FSInputStream
     final String dnAddr =
         chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
     DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
-    InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
+    boolean uriCacheEnabled = dfsClient.getConf().isUriCacheEnabled();
+    InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr,
+        -1, null, uriCacheEnabled);
     return new DNAddrPair(chosenNode, targetAddr, storageType, block);
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 8561cab..0e4cebf 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -417,6 +417,9 @@ public interface HdfsClientConfigKeys {
 
     String  PREFETCH_SIZE_KEY = PREFIX + "prefetch.size";
 
+    String URI_CACHE_KEY = PREFIX + "uri.cache.enabled";
+    boolean URI_CACHE_DEFAULT = false;
+
     interface ShortCircuit {
       String PREFIX = Read.PREFIX + "shortcircuit.";
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index e41b608..07fe80c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -129,6 +129,7 @@ public class DfsClientConf {
   private final int blockWriteLocateFollowingMaxDelayMs;
   private final long defaultBlockSize;
   private final long prefetchSize;
+  private final boolean uriCacheEnabled;
   private final short defaultReplication;
   private final String taskId;
   private final FsPermission uMask;
@@ -211,24 +212,7 @@ public class DfsClientConf {
         Write.MAX_PACKETS_IN_FLIGHT_KEY,
         Write.MAX_PACKETS_IN_FLIGHT_DEFAULT);
 
-    final boolean byteArrayManagerEnabled = conf.getBoolean(
-        Write.ByteArrayManager.ENABLED_KEY,
-        Write.ByteArrayManager.ENABLED_DEFAULT);
-    if (!byteArrayManagerEnabled) {
-      writeByteArrayManagerConf = null;
-    } else {
-      final int countThreshold = conf.getInt(
-          Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
-          Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
-      final int countLimit = conf.getInt(
-          Write.ByteArrayManager.COUNT_LIMIT_KEY,
-          Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
-      final long countResetTimePeriodMs = conf.getLong(
-          Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
-          Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
-      writeByteArrayManagerConf = new ByteArrayManager.Conf(
-          countThreshold, countLimit, countResetTimePeriodMs);
-    }
+    writeByteArrayManagerConf = loadWriteByteArrayManagerConf(conf);
 
     defaultBlockSize = conf.getLongBytes(DFS_BLOCK_SIZE_KEY,
         DFS_BLOCK_SIZE_DEFAULT);
@@ -240,6 +224,10 @@ public class DfsClientConf {
         Write.EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT);
     prefetchSize = conf.getLong(Read.PREFETCH_SIZE_KEY,
         10 * defaultBlockSize);
+
+    uriCacheEnabled = conf.getBoolean(Read.URI_CACHE_KEY,
+        Read.URI_CACHE_DEFAULT);
+
     numCachedConnRetry = conf.getInt(DFS_CLIENT_CACHED_CONN_RETRY_KEY,
         DFS_CLIENT_CACHED_CONN_RETRY_DEFAULT);
     numBlockWriteRetry = conf.getInt(
@@ -308,6 +296,27 @@ public class DfsClientConf {
                     "can't be more then 5.");
   }
 
+  private ByteArrayManager.Conf loadWriteByteArrayManagerConf(
+      Configuration conf) {
+    final boolean byteArrayManagerEnabled = conf.getBoolean(
+        Write.ByteArrayManager.ENABLED_KEY,
+        Write.ByteArrayManager.ENABLED_DEFAULT);
+    if (!byteArrayManagerEnabled) {
+      return null;
+    }
+    final int countThreshold = conf.getInt(
+        Write.ByteArrayManager.COUNT_THRESHOLD_KEY,
+        Write.ByteArrayManager.COUNT_THRESHOLD_DEFAULT);
+    final int countLimit = conf.getInt(
+        Write.ByteArrayManager.COUNT_LIMIT_KEY,
+        Write.ByteArrayManager.COUNT_LIMIT_DEFAULT);
+    final long countResetTimePeriodMs = conf.getLong(
+        Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_KEY,
+        Write.ByteArrayManager.COUNT_RESET_TIME_PERIOD_MS_DEFAULT);
+    return new ByteArrayManager.Conf(
+        countThreshold, countLimit, countResetTimePeriodMs);
+  }
+
   @SuppressWarnings("unchecked")
   private List<Class<? extends ReplicaAccessorBuilder>>
       loadReplicaAccessorBuilderClasses(Configuration conf) {
@@ -556,6 +565,13 @@ public class DfsClientConf {
   }
 
   /**
+   * @return the uriCacheEnable
+   */
+  public boolean isUriCacheEnabled() {
+    return uriCacheEnabled;
+  }
+
+  /**
    * @return the defaultReplication
    */
   public short getDefaultReplication() {
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index db152e6..4a0da7a 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4186,6 +4186,15 @@
 </property>
 
 <property>
+  <name>dfs.client.read.uri.cache.enabled</name>
+  <value>false</value>
+  <description>
+    If true, dfs client will use cache when creating URI based on host:port
+    to reduce the frequency of URI object creation.
+  </description>
+</property>
+
+<property>
   <name>dfs.client.read.short.circuit.replica.stale.threshold.ms</name>
   <value>1800000</value>
   <description>
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index 63c6233..9d6e589 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -44,6 +44,7 @@ public class TestHdfsConfigFields extends 
TestConfigurationFieldsBase {
         HdfsClientConfigKeys.Failover.class,
         HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
         HdfsClientConfigKeys.BlockWrite.class,
+        HdfsClientConfigKeys.Read.class,
         HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
 
     // Set error modes


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to