keith-turner commented on code in PR #4309:
URL: https://github.com/apache/accumulo/pull/4309#discussion_r1507749408
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportKey.java:
##########
@@ -81,7 +92,7 @@ public boolean equals(Object o) {
return false;
}
ThriftTransportKey ttk = (ThriftTransportKey) o;
- return server.equals(ttk.server) && timeout == ttk.timeout
+ return type.equals(ttk.type) && server.equals(ttk.server) && timeout ==
ttk.timeout
Review Comment:
Looked at ThriftClientTypes and did not see that it implements equals() or
hashCode(). Seems like the intention is that there is only one instance of
type in the JVM so its ok to fall back to the object hashCode. This makes it
tricky to reason about the correctness, not sure of a way to improve this. If
possible it would be good to reduce/localize the amount of assumptions being
made around this. Thought through the following possible solutions, but they
seemed like dead ends.
* could we make ThriftClientTypes an enum? Does not seem like this would
work with the type parameter.
* could we make ThriftClientTypes constructor private? This makes it
easier to reason about single instances. The subclasses would probably need to
be pulled in as inner classes to do this. Not sure if its workable even then
though.
* could we add equals and hashCode impls that only look at the service
name? This feels dubious because it excludes the factory
This issue does not need to hold up this PR if the current code does only
create a single instance per type in the JVM. Did not attempt to verify if
this is the case or not.
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +59,56 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
- // add tservers
- ZooCache zc = context.getZooCache();
- for (String tserver : zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS)) {
+ if (preferCachedConnections) {
+ Pair<String,TTransport> cachedTransport =
+ context.getTransportPool().getAnyCachedTransport(type);
+ if (cachedTransport != null) {
+ C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+ warned.set(false);
+ return new Pair<String,C>(cachedTransport.getFirst(), client);
+ }
+ }
+
+ final long rpcTimeout = context.getClientTimeoutInMillis();
+ final ZooCache zc = context.getZooCache();
+ final List<String> tservers = new ArrayList<>();
+
+ tservers.addAll(zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS));
+
+ if (tservers.isEmpty()) {
+ if (warned.compareAndSet(false, true)) {
+ LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
+ }
+ throw new TTransportException("There are no servers for type: " + type);
+ }
+
+ // Try to connect to an online tserver
+ Collections.shuffle(tservers);
+ for (String tserver : tservers) {
var zLocPath =
ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/" + tserver);
byte[] data = zc.getLockData(zLocPath);
if (data != null) {
String strData = new String(data, UTF_8);
if (!strData.equals("manager")) {
- servers.add(new ThriftTransportKey(
- new ServerServices(strData).getAddress(Service.TSERV_CLIENT),
rpcTimeout, context));
- }
- }
- }
-
- boolean opened = false;
- try {
- Pair<String,TTransport> pair =
- context.getTransportPool().getAnyTransport(servers,
preferCachedConnections);
- C client = ThriftUtil.createClient(type, pair.getSecond());
- opened = true;
- warned.set(false);
- return new Pair<>(pair.getFirst(), client);
- } finally {
- if (!opened) {
- if (warned.compareAndSet(false, true)) {
- if (servers.isEmpty()) {
- LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
- } else {
- LOG.warn("Failed to find an available server in the list of
servers: {}", servers);
+ final HostAndPort tserverClientAddress =
+ new ServerServices(strData).getAddress(Service.TSERV_CLIENT);
+ try {
+ TTransport transport =
context.getTransportPool().getTransport(type,
+ tserverClientAddress, rpcTimeout, context,
preferCachedConnections);
+ C client = ThriftUtil.createClient(type, transport);
+ warned.set(false);
+ return new Pair<String,C>(tserverClientAddress.toString(), client);
+ } catch (TTransportException e) {
+ LOG.trace("Error creating transport to {}", tserverClientAddress);
+ continue;
}
}
}
}
+ LOG.warn("Failed to find an available server in the list of servers: {}",
tservers);
Review Comment:
Maybe this should use the warned atomic bool. Also could include the type
in the log message.
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +59,56 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
- // add tservers
- ZooCache zc = context.getZooCache();
- for (String tserver : zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS)) {
+ if (preferCachedConnections) {
+ Pair<String,TTransport> cachedTransport =
+ context.getTransportPool().getAnyCachedTransport(type);
+ if (cachedTransport != null) {
+ C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+ warned.set(false);
+ return new Pair<String,C>(cachedTransport.getFirst(), client);
+ }
+ }
+
+ final long rpcTimeout = context.getClientTimeoutInMillis();
+ final ZooCache zc = context.getZooCache();
+ final List<String> tservers = new ArrayList<>();
+
+ tservers.addAll(zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS));
+
+ if (tservers.isEmpty()) {
+ if (warned.compareAndSet(false, true)) {
+ LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
Review Comment:
Is this code used for more than tservers now? Should the log message
replace `tablet servers` with the type?
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +59,56 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
- // add tservers
- ZooCache zc = context.getZooCache();
- for (String tserver : zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS)) {
+ if (preferCachedConnections) {
+ Pair<String,TTransport> cachedTransport =
+ context.getTransportPool().getAnyCachedTransport(type);
+ if (cachedTransport != null) {
+ C client = ThriftUtil.createClient(type, cachedTransport.getSecond());
+ warned.set(false);
+ return new Pair<String,C>(cachedTransport.getFirst(), client);
+ }
+ }
+
+ final long rpcTimeout = context.getClientTimeoutInMillis();
+ final ZooCache zc = context.getZooCache();
+ final List<String> tservers = new ArrayList<>();
+
+ tservers.addAll(zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS));
+
+ if (tservers.isEmpty()) {
+ if (warned.compareAndSet(false, true)) {
+ LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
+ }
+ throw new TTransportException("There are no servers for type: " + type);
+ }
+
+ // Try to connect to an online tserver
+ Collections.shuffle(tservers);
+ for (String tserver : tservers) {
var zLocPath =
ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/" + tserver);
byte[] data = zc.getLockData(zLocPath);
if (data != null) {
String strData = new String(data, UTF_8);
if (!strData.equals("manager")) {
- servers.add(new ThriftTransportKey(
- new ServerServices(strData).getAddress(Service.TSERV_CLIENT),
rpcTimeout, context));
- }
- }
- }
-
- boolean opened = false;
- try {
- Pair<String,TTransport> pair =
- context.getTransportPool().getAnyTransport(servers,
preferCachedConnections);
- C client = ThriftUtil.createClient(type, pair.getSecond());
- opened = true;
- warned.set(false);
- return new Pair<>(pair.getFirst(), client);
- } finally {
- if (!opened) {
- if (warned.compareAndSet(false, true)) {
- if (servers.isEmpty()) {
- LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
- } else {
- LOG.warn("Failed to find an available server in the list of
servers: {}", servers);
+ final HostAndPort tserverClientAddress =
+ new ServerServices(strData).getAddress(Service.TSERV_CLIENT);
+ try {
+ TTransport transport =
context.getTransportPool().getTransport(type,
+ tserverClientAddress, rpcTimeout, context,
preferCachedConnections);
+ C client = ThriftUtil.createClient(type, transport);
+ warned.set(false);
+ return new Pair<String,C>(tserverClientAddress.toString(), client);
+ } catch (TTransportException e) {
+ LOG.trace("Error creating transport to {}", tserverClientAddress);
+ continue;
}
}
}
}
+ LOG.warn("Failed to find an available server in the list of servers: {}",
tservers);
+ throw new TTransportException("Failed to connect to any server");
Review Comment:
could include type in the message
##########
core/src/test/java/org/apache/accumulo/core/clientImpl/ThriftTransportKeyTest.java:
##########
@@ -136,8 +138,8 @@ public void testSimpleEquivalence() {
replay(clientCtx);
- ThriftTransportKey ttk =
- new ThriftTransportKey(HostAndPort.fromParts("localhost", 9999),
120_000, clientCtx);
+ ThriftTransportKey ttk = new ThriftTransportKey(ThriftClientTypes.CLIENT,
+ HostAndPort.fromParts("localhost", 9999), 120_000, clientCtx);
assertEquals(ttk, ttk, "Normal ThriftTransportKey doesn't equal itself");
Review Comment:
This is a preexisting problem with this test, would be nice to make it
follow the pattern in the other existing test.
```suggestion
assertEquals(ttk, ttk, "Normal ThriftTransportKey doesn't equal itself");
assertEquals(ttk.hashCode(), ttk.hashCode());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]