keith-turner commented on code in PR #4309:
URL: https://github.com/apache/accumulo/pull/4309#discussion_r1505011844
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +60,54 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
+ final long rpcTimeout = context.getClientTimeoutInMillis();
- // add tservers
- ZooCache zc = context.getZooCache();
- for (String tserver : zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS)) {
- var zLocPath =
- ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/" + tserver);
- byte[] data = zc.getLockData(zLocPath);
- if (data != null) {
- String strData = new String(data, UTF_8);
- if (!strData.equals("manager")) {
- servers.add(new ThriftTransportKey(
- new ServerServices(strData).getAddress(Service.TSERV_CLIENT),
rpcTimeout, context));
+ final ZooCache zc = context.getZooCache();
+ final List<String> tservers = new ArrayList<>();
+ final AtomicBoolean warnedAboutTServersBeingDown = new
AtomicBoolean(false);
+
+ for (int retries = 0; retries < 10; retries++) {
Review Comment:
The existing code did not retry and poking around it seems that code that
calls this method does retry at higher levels. It would probably be better to
not retry here and defer to the calling code. Although I don't fully
understand the overall behavior of the existing code when the set of tservers
in ZK is empty, sitll trying to understand that. Not exaclty sure how the
existing code would retry. It seems like the addition of throwing
TTransportException in this PR will help with the higher level code that
retries, but not sure how it worked before these changes.
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +60,54 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
+ final long rpcTimeout = context.getClientTimeoutInMillis();
- // add tservers
- ZooCache zc = context.getZooCache();
- for (String tserver : zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS)) {
- var zLocPath =
- ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/" + tserver);
- byte[] data = zc.getLockData(zLocPath);
- if (data != null) {
- String strData = new String(data, UTF_8);
- if (!strData.equals("manager")) {
- servers.add(new ThriftTransportKey(
- new ServerServices(strData).getAddress(Service.TSERV_CLIENT),
rpcTimeout, context));
+ final ZooCache zc = context.getZooCache();
+ final List<String> tservers = new ArrayList<>();
+ final AtomicBoolean warnedAboutTServersBeingDown = new
AtomicBoolean(false);
+
+ for (int retries = 0; retries < 10; retries++) {
+ // Cluster may not be up, wait for tservers to come online
+ while (true) {
+ tservers.addAll(zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS));
+
+ if (!tservers.isEmpty()) {
+ break;
}
+
+ if (tservers.isEmpty() && !warnedAboutTServersBeingDown.get()) {
+ LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
+ warnedAboutTServersBeingDown.set(true);
+ }
+ UtilWaitThread.sleep(100);
}
- }
- boolean opened = false;
- try {
- Pair<String,TTransport> pair =
- context.getTransportPool().getAnyTransport(servers,
preferCachedConnections);
- C client = ThriftUtil.createClient(type, pair.getSecond());
- opened = true;
- warned.set(false);
- return new Pair<>(pair.getFirst(), client);
- } finally {
- if (!opened) {
- if (warned.compareAndSet(false, true)) {
- if (servers.isEmpty()) {
- LOG.warn("There are no tablet servers: check that zookeeper and
accumulo are running.");
- } else {
- LOG.warn("Failed to find an available server in the list of
servers: {}", servers);
+ // Try to connect to an online tserver
+ Collections.shuffle(tservers);
+ for (String tserver : tservers) {
+ var zLocPath =
+ ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS
+ "/" + tserver);
+ byte[] data = zc.getLockData(zLocPath);
+ if (data != null) {
+ String strData = new String(data, UTF_8);
+ if (!strData.equals("manager")) {
Review Comment:
This is a pre-existing problem unrelated to this PR. Is there a constant we
could use here instead of the literal `"manager"`? I took a quick look around
and did not find anything. I can open an issue about this if needed.
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +60,54 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
+ final long rpcTimeout = context.getClientTimeoutInMillis();
- // add tservers
- ZooCache zc = context.getZooCache();
- for (String tserver : zc.getChildren(context.getZooKeeperRoot() +
Constants.ZTSERVERS)) {
- var zLocPath =
- ServiceLock.path(context.getZooKeeperRoot() + Constants.ZTSERVERS +
"/" + tserver);
- byte[] data = zc.getLockData(zLocPath);
- if (data != null) {
- String strData = new String(data, UTF_8);
- if (!strData.equals("manager")) {
- servers.add(new ThriftTransportKey(
- new ServerServices(strData).getAddress(Service.TSERV_CLIENT),
rpcTimeout, context));
+ final ZooCache zc = context.getZooCache();
+ final List<String> tservers = new ArrayList<>();
+ final AtomicBoolean warnedAboutTServersBeingDown = new
AtomicBoolean(false);
Review Comment:
There is a method argument passed in named `warned`, it should probably be
used instead. I think that variable is used among many threads and prevents
lots of threads from separately attempting to log the same warning.
##########
core/src/main/java/org/apache/accumulo/core/rpc/clients/TServerClient.java:
##########
@@ -57,44 +60,54 @@ default Pair<String,C> getTabletServerConnection(Logger
LOG, ThriftClientTypes<C
ClientContext context, boolean preferCachedConnections, AtomicBoolean
warned)
throws TTransportException {
checkArgument(context != null, "context is null");
- long rpcTimeout = context.getClientTimeoutInMillis();
- // create list of servers
- ArrayList<ThriftTransportKey> servers = new ArrayList<>();
+ final long rpcTimeout = context.getClientTimeoutInMillis();
Review Comment:
The preferCachedConnections argument is no longer being used. Maybe we
could do something like the following
```suggestion
if(preferCachedConnections) {
Optional<Pair<String,TTransport>> cachedTransport =
context.getTransportPool().getAnyCachedTransport();
if(cachedTransport.isPresent()) {
// return the cached transport
}
}
final long rpcTimeout = context.getClientTimeoutInMillis();
```
Where` getAnyCachedTransport()` would be a new method on ThriftTransportPool
that reuses [this code
](https://github.com/apache/accumulo/blob/c72bf8c00cb196d048d1aa35077327c55d6e58e2/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftTransportPool.java#L137-L152)
with some modifications not to do the set intersection. The new method could
find and reserve an unreserved cached transport for a random server if one is
available.
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java:
##########
@@ -249,6 +251,27 @@ public ClientContext(SingletonReservation reservation,
ClientInfo info,
clientThreadPools = ThreadPools.getClientThreadPools(ueh);
}
}
+ // Kick off a task to try and populate the ZooCache with TabletServer
+ // information. It may not be complete as the cluster may be starting.
+ // It's a best effort
+ clientThreadPools
Review Comment:
I don't think we should add this as it will add load on ZK that may not be
worth the effort depending on what the accumulo client does and how long it
lives. For example lots of accumulo clients created by map reduce or spark
jobs will probably not benefit from populating this cache, but when they all
start willl generate a lot of uneeded ZK activity. The other changes in this
PR to TServerClient could really improve that scenario. The changes in this PR
to TServerClient are really nice from the perspective of reducing ZK load when
lots of Accumulo client processes start around the same time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]