keith-turner commented on code in PR #4309:
URL: https://github.com/apache/accumulo/pull/4309#discussion_r1507749408


##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java:
##########
@@ -81,7 +92,7 @@ public boolean equals(Object o) {
       return false;
     }
     ThriftTransportKey ttk = (ThriftTransportKey) o;
-    return server.equals(ttk.server) && timeout == ttk.timeout
+    return type.equals(ttk.type) && server.equals(ttk.server) && timeout == 
ttk.timeout

Review Comment:
   Looked at ThriftClientTypes and did not see that it implements equals() or 
hashCode().  Seems like the intention is that there is only one instance of 
type in the JVM so its ok to fall back to the object hashCode.  This makes it 
tricky to reason about the correctness, not sure of a way to improve this.  If 
possible it would be good to reduce/localize the amount of assumptions being 
made around this.  Thought through the following possible solutions, but they 
seemed like dead ends.
   
    * could we make ThriftClientTypes an enum?  Does not seem like this would 
work with the type parameter.
    * could we make ThriftClientTypes constructor private?  This makes it 
easier to reason about single instances.  The subclasses would probably need to 
be pulled in as inner classes to do this.  Not sure if its workable even then 
though.
    * could we add equals and hashCode impls that only look at the service 
name?  This feels dubious because it excludes the factory
   
   This issue does not need to hold up this PR if the current code does only 
create a single instance per type in the JVM.  Did not attempt to verify if 
this is the case or not.



##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +59,56 @@ default Pair<String,C> getTabletServerConnection(Logger 
LOG, ThriftClientTypes<C
       ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned)
       throws TTransportException {
     checkArgument(context != null, "context is null");
-    long rpcTimeout = context.getClientTimeoutInMillis();
-    // create list of servers
-    ArrayList<ThriftTransportKey> servers = new ArrayList<>();
 
-    // add tservers
-    ZooCache zc = context.getZooCache();
-    for (String tserver : zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS)) {
+    if (preferCachedConnections) {
+      Pair<String,TTransport> cachedTransport =
+          context.getTransportPool().getAnyCachedTransport(type);
+      if (cachedTransport != null) {
+        C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+        warned.set(false);
+        return new Pair<String,C>(cachedTransport.getFirst(), client);
+      }
+    }
+
+    final long rpcTimeout = context.getClientTimeoutInMillis();
+    final ZooCache zc = context.getZooCache();
+    final List<String> tservers = new ArrayList<>();
+
+    tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS));
+
+    if (tservers.isEmpty()) {
+      if (warned.compareAndSet(false, true)) {
+        LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
+      }
+      throw new TTransportException("There are no servers for type: " + type);
+    }
+
+    // Try to connect to an online tserver
+    Collections.shuffle(tservers);
+    for (String tserver : tservers) {
       var zLocPath =
           ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + 
"/" + tserver);
       byte[] data = zc.getLockData(zLocPath);
       if (data != null) {
         String strData = new String(data, UTF_8);
         if (!strData.equals("manager")) {
-          servers.add(new ThriftTransportKey(
-              new ServerServices(strData).getAddress(Service.TSERV_CLIENT), 
rpcTimeout, context));
-        }
-      }
-    }
-
-    boolean opened = false;
-    try {
-      Pair<String,TTransport> pair =
-          context.getTransportPool().getAnyTransport(servers, 
preferCachedConnections);
-      C client = ThriftUtil.createClient(type, pair.getSecond());
-      opened = true;
-      warned.set(false);
-      return new Pair<>(pair.getFirst(), client);
-    } finally {
-      if (!opened) {
-        if (warned.compareAndSet(false, true)) {
-          if (servers.isEmpty()) {
-            LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
-          } else {
-            LOG.warn("Failed to find an available server in the list of 
servers: {}", servers);
+          final HostAndPort tserverClientAddress =
+              new ServerServices(strData).getAddress(Service.TSERV_CLIENT);
+          try {
+            TTransport transport = 
context.getTransportPool().getTransport(type,
+                tserverClientAddress, rpcTimeout, context, 
preferCachedConnections);
+            C client = ThriftUtil.createClient(type, transport);
+            warned.set(false);
+            return new Pair<String,C>(tserverClientAddress.toString(), client);
+          } catch (TTransportException e) {
+            LOG.trace("Error creating transport to {}", tserverClientAddress);
+            continue;
           }
         }
       }
     }
+    LOG.warn("Failed to find an available server in the list of servers: {}", 
tservers);

Review Comment:
   Maybe this should use the warned atomic bool.  Also could include the type 
in the log message.



##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +59,56 @@ default Pair<String,C> getTabletServerConnection(Logger 
LOG, ThriftClientTypes<C
       ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned)
       throws TTransportException {
     checkArgument(context != null, "context is null");
-    long rpcTimeout = context.getClientTimeoutInMillis();
-    // create list of servers
-    ArrayList<ThriftTransportKey> servers = new ArrayList<>();
 
-    // add tservers
-    ZooCache zc = context.getZooCache();
-    for (String tserver : zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS)) {
+    if (preferCachedConnections) {
+      Pair<String,TTransport> cachedTransport =
+          context.getTransportPool().getAnyCachedTransport(type);
+      if (cachedTransport != null) {
+        C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+        warned.set(false);
+        return new Pair<String,C>(cachedTransport.getFirst(), client);
+      }
+    }
+
+    final long rpcTimeout = context.getClientTimeoutInMillis();
+    final ZooCache zc = context.getZooCache();
+    final List<String> tservers = new ArrayList<>();
+
+    tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS));
+
+    if (tservers.isEmpty()) {
+      if (warned.compareAndSet(false, true)) {
+        LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");

Review Comment:
   Is this code used for more than tservers now?  Should the log message 
replace `tablet servers` with the type?



##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +59,56 @@ default Pair<String,C> getTabletServerConnection(Logger 
LOG, ThriftClientTypes<C
       ClientContext context, boolean preferCachedConnections, AtomicBoolean 
warned)
       throws TTransportException {
     checkArgument(context != null, "context is null");
-    long rpcTimeout = context.getClientTimeoutInMillis();
-    // create list of servers
-    ArrayList<ThriftTransportKey> servers = new ArrayList<>();
 
-    // add tservers
-    ZooCache zc = context.getZooCache();
-    for (String tserver : zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS)) {
+    if (preferCachedConnections) {
+      Pair<String,TTransport> cachedTransport =
+          context.getTransportPool().getAnyCachedTransport(type);
+      if (cachedTransport != null) {
+        C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+        warned.set(false);
+        return new Pair<String,C>(cachedTransport.getFirst(), client);
+      }
+    }
+
+    final long rpcTimeout = context.getClientTimeoutInMillis();
+    final ZooCache zc = context.getZooCache();
+    final List<String> tservers = new ArrayList<>();
+
+    tservers.addAll(zc.getChildren(context.getZooKeeperRoot() + 
Constants.ZTSERVERS));
+
+    if (tservers.isEmpty()) {
+      if (warned.compareAndSet(false, true)) {
+        LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
+      }
+      throw new TTransportException("There are no servers for type: " + type);
+    }
+
+    // Try to connect to an online tserver
+    Collections.shuffle(tservers);
+    for (String tserver : tservers) {
       var zLocPath =
           ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS + 
"/" + tserver);
       byte[] data = zc.getLockData(zLocPath);
       if (data != null) {
         String strData = new String(data, UTF_8);
         if (!strData.equals("manager")) {
-          servers.add(new ThriftTransportKey(
-              new ServerServices(strData).getAddress(Service.TSERV_CLIENT), 
rpcTimeout, context));
-        }
-      }
-    }
-
-    boolean opened = false;
-    try {
-      Pair<String,TTransport> pair =
-          context.getTransportPool().getAnyTransport(servers, 
preferCachedConnections);
-      C client = ThriftUtil.createClient(type, pair.getSecond());
-      opened = true;
-      warned.set(false);
-      return new Pair<>(pair.getFirst(), client);
-    } finally {
-      if (!opened) {
-        if (warned.compareAndSet(false, true)) {
-          if (servers.isEmpty()) {
-            LOG.warn("There are no tablet servers: check that zookeeper and 
accumulo are running.");
-          } else {
-            LOG.warn("Failed to find an available server in the list of 
servers: {}", servers);
+          final HostAndPort tserverClientAddress =
+              new ServerServices(strData).getAddress(Service.TSERV_CLIENT);
+          try {
+            TTransport transport = 
context.getTransportPool().getTransport(type,
+                tserverClientAddress, rpcTimeout, context, 
preferCachedConnections);
+            C client = ThriftUtil.createClient(type, transport);
+            warned.set(false);
+            return new Pair<String,C>(tserverClientAddress.toString(), client);
+          } catch (TTransportException e) {
+            LOG.trace("Error creating transport to {}", tserverClientAddress);
+            continue;
           }
         }
       }
     }
+    LOG.warn("Failed to find an available server in the list of servers: {}", 
tservers);
+    throw new TTransportException("Failed to connect to any server");

Review Comment:
   could include type in the message



##########
core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java:
##########
@@ -136,8 +138,8 @@ public void testSimpleEquivalence() {
 
     replay(clientCtx);
 
-    ThriftTransportKey ttk =
-        new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999), 
120_000, clientCtx);
+    ThriftTransportKey ttk = new ThriftTransportKey(ThriftClientTypes.CLIENT,
+        HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx);
 
     assertEquals(ttk, ttk, "Normal ThriftTransportKey doesn't equal itself");

Review Comment:
   This is a preexisting problem with this test, would be nice to make it 
follow the pattern in the other existing test.
   
   ```suggestion
       assertEquals(ttk, ttk, "Normal ThriftTransportKey doesn't equal itself");
       assertEquals(ttk.hashCode(), ttk.hashCode());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to