This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch 1451-external-compactions-feature in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1451-external-compactions-feature by this push: new 3b9b9fa Fixes #1989: Optimization in CompactionCoordinator startup to find tserver hosting tablet that is being compacted 3b9b9fa is described below commit 3b9b9fa3105632aec5332cafbf857b5fdba4153d Author: Dave Marion <dlmar...@apache.org> AuthorDate: Wed Mar 31 15:31:07 2021 +0000 Fixes #1989: Optimization in CompactionCoordinator startup to find tserver hosting tablet that is being compacted --- .../coordinator/CompactionCoordinator.java | 79 +++++++++++++++++----- .../org/apache/accumulo/compactor/Compactor.java | 4 +- 2 files changed, 64 insertions(+), 19 deletions(-) diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index 7e1a42a..895d131 100644 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@ -47,6 +47,9 @@ import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.metadata.TServerInstance; import org.apache.accumulo.core.metadata.schema.ExternalCompactionId; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.rpc.ThriftUtil; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.thrift.CompactionStats; @@ -213,27 +216,69 @@ public class CompactionCoordinator extends AbstractServer LOG.info("Found {} running external compactions", running.size()); running.forEach((hp, job) -> { // Find the tserver that has this compaction id - // CBUG: Is there a more efficient way of finding the tablet server? boolean matchFound = false; - for (TServerInstance tsi : tservers) { - TabletClientService.Client client = null; - try { - client = getTabletServerConnection(tsi); - boolean tserverMatch = client.isRunningExternalCompaction(TraceUtil.traceInfo(), - getContext().rpcCreds(), job.getExternalCompactionId(), job.getExtent()); - if (tserverMatch) { - RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), - new RunningCompaction(job, ExternalCompactionUtil.getHostPortString(hp), tsi)); - matchFound = true; + + // Attempt to find the TServer hosting the tablet based on the metadata table + // CBUG use #1974 for more efficient metadata reads + KeyExtent extent = KeyExtent.fromThrift(job.getExtent()); + TabletMetadata tabletMetadata = getContext().getAmple().readTablets().forTablet(extent) + .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build().stream().findFirst() + .orElse(null); + + if (tabletMetadata != null && tabletMetadata.getExtent().equals(extent) + && tabletMetadata.getLocation() != null + && tabletMetadata.getLocation().getType() == LocationType.CURRENT) { + + TServerInstance tsi = tservers.stream() + .filter( + t -> t.getHostAndPort().equals(tabletMetadata.getLocation().getHostAndPort())) + .findFirst().orElse(null); + + if (null != tsi) { + TabletClientService.Client client = null; + try { + client = getTabletServerConnection(tsi); + boolean tserverMatch = client.isRunningExternalCompaction(TraceUtil.traceInfo(), + getContext().rpcCreds(), job.getExternalCompactionId(), job.getExtent()); + if (tserverMatch) { + RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), + new RunningCompaction(job, ExternalCompactionUtil.getHostPortString(hp), + tsi)); + matchFound = true; + } + } catch (TException e) { + LOG.warn("Failed to notify tserver {}", + tabletMetadata.getLocation().getHostAndPort(), e); + } finally { + ThriftUtil.returnClient(client); } - } catch (TException e) { - LOG.error( - "Error from tserver {} while trying to check if external compaction is running, trying next tserver", - ExternalCompactionUtil.getHostPortString(tsi.getHostAndPort()), e); - } finally { - ThriftUtil.returnClient(client); } } + + // As a fallback, try them all + if (!matchFound) { + for (TServerInstance tsi : tservers) { + TabletClientService.Client client = null; + try { + client = getTabletServerConnection(tsi); + boolean tserverMatch = client.isRunningExternalCompaction(TraceUtil.traceInfo(), + getContext().rpcCreds(), job.getExternalCompactionId(), job.getExtent()); + if (tserverMatch) { + RUNNING.put(ExternalCompactionId.of(job.getExternalCompactionId()), + new RunningCompaction(job, ExternalCompactionUtil.getHostPortString(hp), + tsi)); + matchFound = true; + } + } catch (TException e) { + LOG.error( + "Error from tserver {} while trying to check if external compaction is running, trying next tserver", + ExternalCompactionUtil.getHostPortString(tsi.getHostAndPort()), e); + } finally { + ThriftUtil.returnClient(client); + } + } + } + if (!matchFound) { // There is an external compaction running on a Compactor, but we can't resolve it to a // TServer? diff --git a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index ed46edd..698e5f8 100644 --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@ -717,13 +717,13 @@ public class Compactor extends AbstractServer job = JOB_HOLDER.getJob(); } while (null == job) { - //CBUG: It's possible that the call from the coordinator could + // CBUG: It's possible that the call from the coordinator could // be stuck here waiting for a compaction to be reserved and stall the // DeadCompactionDetector from contacting other compactors. UtilWaitThread.sleep(50); synchronized (JOB_HOLDER) { job = JOB_HOLDER.getJob(); - } + } } return job; }