This is an automated email from the ASF dual-hosted git repository. dlmarion pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push: new af958faa0c Updates to ScanServerAttempts (#2912) af958faa0c is described below commit af958faa0c67d17a32f808507ca82728a3e592a3 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Tue Oct 18 14:08:53 2022 -0400 Updates to ScanServerAttempts (#2912) As a follow-up to #2880: * Remove unused and undocumented endTime in ScanAttempt API * Separate out more inner classes/interfaces into their own files * Simplify concurrency by synchronizing on the attempts and preparing the snapshot using immutable collections * IDE changes also converted some anonymous inner classes to lambdas --- .../ScanServerAttemptImpl.java} | 35 ++++--- .../ScanServerAttemptReporter.java} | 23 +---- .../core/clientImpl/ScanServerAttemptsImpl.java | 104 +++------------------ .../TabletServerBatchReaderIterator.java | 11 +-- .../accumulo/core/spi/scan/ScanServerAttempt.java | 2 - .../core/clientImpl/ScanAttemptsImplTest.java | 2 +- .../scan/ConfigurableScanServerSelectorTest.java | 11 +-- 7 files changed, 47 insertions(+), 141 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java similarity index 61% copy from core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java copy to core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java index dde2ec4021..bda2f26880 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java @@ -16,25 +16,30 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.core.spi.scan; +package org.apache.accumulo.core.clientImpl; -/** - * This object is used to communicate what previous actions were attempted, when they were - * attempted, and the result of those attempts - * - * @since 2.1.0 - */ -public interface ScanServerAttempt { +import java.util.Objects; - // represents reasons that previous attempts to scan failed - enum Result { - BUSY, ERROR - } +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; + +class ScanServerAttemptImpl implements ScanServerAttempt { - String getServer(); + private final String server; + private final Result result; - long getEndTime(); + ScanServerAttemptImpl(Result result, String server) { + this.result = result; + this.server = Objects.requireNonNull(server); + } + + @Override + public String getServer() { + return server; + } - ScanServerAttempt.Result getResult(); + @Override + public Result getResult() { + return result; + } } diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptReporter.java similarity index 65% copy from core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java copy to core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptReporter.java index dde2ec4021..37983d3688 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptReporter.java @@ -16,25 +16,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.accumulo.core.spi.scan; +package org.apache.accumulo.core.clientImpl; -/** - * This object is used to communicate what previous actions were attempted, when they were - * attempted, and the result of those attempts - * - * @since 2.1.0 - */ -public interface ScanServerAttempt { - - // represents reasons that previous attempts to scan failed - enum Result { - BUSY, ERROR - } - - String getServer(); - - long getEndTime(); - - ScanServerAttempt.Result getResult(); +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; +interface ScanServerAttemptReporter { + void report(ScanServerAttempt.Result result); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java index fc0896aeb2..48ff72b532 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java @@ -18,12 +18,14 @@ */ package org.apache.accumulo.core.clientImpl; +import static java.util.stream.Collectors.toUnmodifiableMap; + +import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; +import java.util.Map.Entry; import org.apache.accumulo.core.data.TabletId; import org.apache.accumulo.core.spi.scan.ScanServerAttempt; @@ -41,71 +43,14 @@ public class ScanServerAttemptsImpl { private static final Logger LOG = LoggerFactory.getLogger(ScanServerAttemptsImpl.class); - static class ScanServerAttemptImpl implements ScanServerAttempt { - - private final String server; - private final long time; - private final Result result; - private volatile long mutationCount = Long.MAX_VALUE; - - ScanServerAttemptImpl(Result result, String server, long time) { - this.result = result; - this.server = Objects.requireNonNull(server); - this.time = time; - } - - @Override - public String getServer() { - return server; - } - - @Override - public long getEndTime() { - return time; - } - - @Override - public Result getResult() { - return result; - } - - private void setMutationCount(long mc) { - this.mutationCount = mc; - } - - public long getMutationCount() { - return mutationCount; - } - } - - private final Map<TabletId,Collection<ScanServerAttemptImpl>> attempts = - new ConcurrentHashMap<>(); - private long mutationCounter = 0; + private final Map<TabletId,Collection<ScanServerAttemptImpl>> attempts = new HashMap<>(); - private void add(TabletId tablet, ScanServerAttempt.Result result, String server, long endTime) { - - ScanServerAttemptImpl sa = new ScanServerAttemptImpl(result, server, endTime); - - attempts.computeIfAbsent(tablet, k -> ConcurrentHashMap.newKeySet()).add(sa); - - synchronized (this) { - // now that the scan attempt obj is added to all concurrent data structs, make it visible - // need to atomically increment the counter AND set the counter on the object - sa.setMutationCount(mutationCounter++); - } - - } - - public interface ScanAttemptReporter { - void report(ScanServerAttempt.Result result); - } - - ScanAttemptReporter createReporter(String server, TabletId tablet) { - return new ScanAttemptReporter() { - @Override - public void report(ScanServerAttempt.Result result) { - LOG.trace("Received result: {}", result); - add(tablet, result, server, System.currentTimeMillis()); + ScanServerAttemptReporter createReporter(String server, TabletId tablet) { + return result -> { + LOG.trace("Received result: {}", result); + synchronized (attempts) { + attempts.computeIfAbsent(tablet, k -> new ArrayList<>()) + .add(new ScanServerAttemptImpl(result, server)); } }; } @@ -118,28 +63,9 @@ public class ScanServerAttemptsImpl { * that TabletId */ Map<TabletId,Collection<ScanServerAttemptImpl>> snapshot() { - - final long mutationCounterSnapshot; - synchronized (ScanServerAttemptsImpl.this) { - mutationCounterSnapshot = mutationCounter; + synchronized (attempts) { + return attempts.entrySet().stream() + .collect(toUnmodifiableMap(Entry::getKey, entry -> List.copyOf(entry.getValue()))); } - - Map<TabletId,Collection<ScanServerAttemptImpl>> result = new ConcurrentHashMap<>(); - - attempts.forEach((tabletId, scanAttempts) -> { - - // filter out ScanServerScanAttempt objects that were added after this call - List<ScanServerAttemptImpl> filteredScanAttempts = scanAttempts.stream() - .filter(scanAttempt -> scanAttempt.getMutationCount() < mutationCounterSnapshot) - .collect(Collectors.toList()); - - // only add an entry to the map if there are ScanServerScanAttempt objects for the current - // TabletId - if (!filteredScanAttempts.isEmpty()) - result.put(tabletId, filteredScanAttempts); - - }); - - return result; } } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java index 63eb186a49..796a4aa886 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java @@ -344,13 +344,12 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private List<Column> columns; private int semaphoreSize; private final long busyTimeout; - private final ScanServerAttemptsImpl.ScanAttemptReporter reporter; + private final ScanServerAttemptReporter reporter; private final Duration scanServerSelectorDelay; QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges, Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns, - long busyTimeout, ScanServerAttemptsImpl.ScanAttemptReporter reporter, - Duration scanServerSelectorDelay) { + long busyTimeout, ScanServerAttemptReporter reporter, Duration scanServerSelectorDelay) { this.tsLocation = tsLocation; this.tabletsRanges = tabletsRanges; this.receiver = receiver; @@ -487,7 +486,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value long busyTimeout = 0; Duration scanServerSelectorDelay = null; - Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters = Map.of(); + Map<String,ScanServerAttemptReporter> reporters = Map.of(); if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) { var scanServerData = rebinToScanServers(binnedRanges); @@ -580,7 +579,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value private static class ScanServerData { Map<String,Map<KeyExtent,List<Range>>> binnedRanges; ScanServerSelections actions; - Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters; + Map<String,ScanServerAttemptReporter> reporters; } private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges) { @@ -624,7 +623,7 @@ public class TabletServerBatchReaderIterator implements Iterator<Entry<Key,Value Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>(); - Map<String,ScanServerAttemptsImpl.ScanAttemptReporter> reporters = new HashMap<>(); + Map<String,ScanServerAttemptReporter> reporters = new HashMap<>(); for (TabletIdImpl tabletId : tabletIds) { KeyExtent extent = tabletId.toKeyExtent(); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java index dde2ec4021..b886e3b26d 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/ScanServerAttempt.java @@ -33,8 +33,6 @@ public interface ScanServerAttempt { String getServer(); - long getEndTime(); - ScanServerAttempt.Result getResult(); } diff --git a/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java b/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java index 829f2d3f5a..eee7d2ce77 100644 --- a/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/clientImpl/ScanAttemptsImplTest.java @@ -34,7 +34,7 @@ import org.junit.jupiter.api.Test; public class ScanAttemptsImplTest { private Map<TabletId,Collection<String>> - simplify(Map<TabletId,Collection<ScanServerAttemptsImpl.ScanServerAttemptImpl>> map) { + simplify(Map<TabletId,Collection<ScanServerAttemptImpl>> map) { Map<TabletId,Collection<String>> ret = new HashMap<>(); map.forEach((tabletId, scanAttempts) -> { diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java index dd22c2913a..d4448fb08d 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelectorTest.java @@ -131,12 +131,10 @@ public class ConfigurableScanServerSelectorTest { static class TestScanServerAttempt implements ScanServerAttempt { private final String server; - private final long endTime; private final Result result; - TestScanServerAttempt(String server, long endTime, Result result) { + TestScanServerAttempt(String server, Result result) { this.server = server; - this.endTime = endTime; this.result = result; } @@ -145,11 +143,6 @@ public class ConfigurableScanServerSelectorTest { return server; } - @Override - public long getEndTime() { - return endTime; - } - @Override public Result getResult() { return result; @@ -204,7 +197,7 @@ public class ConfigurableScanServerSelectorTest { var tabletId = nti("1", "m"); var tabletAttempts = Stream.iterate(1, i -> i <= busyAttempts, i -> i + 1) - .map(i -> (new TestScanServerAttempt("ss" + i + ":" + i, i, ScanServerAttempt.Result.BUSY))) + .map(i -> (new TestScanServerAttempt("ss" + i + ":" + i, ScanServerAttempt.Result.BUSY))) .collect(Collectors.toList()); Map<TabletId,Collection<? extends ScanServerAttempt>> attempts = new HashMap<>();