This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 758191f62e1ced59ba3a7d0b8b7d4bf3e1f130b9 Merge: 39fbd1b63e cd81bc7922 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Feb 2 10:04:26 2023 -0500 Merge branch '1.10' into 2.1 .../org/apache/accumulo/core/clientImpl/ThriftScanner.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --cc core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java index f168c7d761,0000000000..26ddf5abc2 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java @@@ -1,756 -1,0 +1,767 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import java.io.IOException; +import java.security.SecureRandom; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.SortedSet; +import java.util.stream.Collectors; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.SampleNotPresentException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.sample.SamplerConfiguration; +import org.apache.accumulo.core.clientImpl.TabletLocator.TabletLocation; +import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.KeyValue; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.data.TabletId; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.dataImpl.TabletIdImpl; +import org.apache.accumulo.core.dataImpl.thrift.InitialScan; +import org.apache.accumulo.core.dataImpl.thrift.IterInfo; +import org.apache.accumulo.core.dataImpl.thrift.ScanResult; +import org.apache.accumulo.core.dataImpl.thrift.TKeyValue; +import org.apache.accumulo.core.rpc.ThriftUtil; +import org.apache.accumulo.core.rpc.clients.ThriftClientTypes; +import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.scan.ScanServerAttempt; +import org.apache.accumulo.core.spi.scan.ScanServerSelections; +import org.apache.accumulo.core.spi.scan.ScanServerSelector; +import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; +import org.apache.accumulo.core.tabletserver.thrift.NotServingTabletException; +import org.apache.accumulo.core.tabletserver.thrift.ScanServerBusyException; +import org.apache.accumulo.core.tabletserver.thrift.TSampleNotPresentException; +import org.apache.accumulo.core.tabletserver.thrift.TabletScanClientService; +import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; +import org.apache.accumulo.core.trace.TraceUtil; +import org.apache.accumulo.core.trace.thrift.TInfo; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.OpTimer; +import org.apache.hadoop.io.Text; +import org.apache.thrift.TApplicationException; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.context.Scope; + +public class ThriftScanner { + private static final Logger log = LoggerFactory.getLogger(ThriftScanner.class); + ++ // This set is initially empty when the client starts. The first time this ++ // client contacts a server it will wait for any writes that are in progress. ++ // This is to account for the case where a client may have sent writes ++ // to accumulo and dies while waiting for a confirmation from ++ // accumulo. The client process restarts and tries to read ++ // data from accumulo making the assumption that it will get ++ // any writes previously made, however if the server side thread ++ // processing the write from the dead client is still in progress, ++ // the restarted client may not see the write unless we wait here. ++ // this behavior is very important when the client is reading the ++ // metadata + public static final Map<TabletType,Set<String>> serversWaitedForWrites = + new EnumMap<>(TabletType.class); + private static final SecureRandom random = new SecureRandom(); + + static { + for (TabletType ttype : TabletType.values()) { + serversWaitedForWrites.put(ttype, Collections.synchronizedSet(new HashSet<>())); + } + } + + public static boolean getBatchFromServer(ClientContext context, Range range, KeyExtent extent, + String server, SortedMap<Key,Value> results, SortedSet<Column> fetchedColumns, + List<IterInfo> serverSideIteratorList, + Map<String,Map<String,String>> serverSideIteratorOptions, int size, + Authorizations authorizations, long batchTimeOut, String classLoaderContext) + throws AccumuloException, AccumuloSecurityException { + if (server == null) { + throw new AccumuloException(new IOException()); + } + + final HostAndPort parsedServer = HostAndPort.fromString(server); + try { + TInfo tinfo = TraceUtil.traceInfo(); + TabletScanClientService.Client client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedServer, context); + try { + // not reading whole rows (or stopping on row boundaries) so there is no need to enable + // isolation below + ScanState scanState = new ScanState(context, extent.tableId(), authorizations, range, + fetchedColumns, size, serverSideIteratorList, serverSideIteratorOptions, false, + Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD, null, batchTimeOut, classLoaderContext, + null, false); + + TabletType ttype = TabletType.type(extent); + boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(server); + InitialScan isr = client.startScan(tinfo, scanState.context.rpcCreds(), extent.toThrift(), + scanState.range.toThrift(), + scanState.columns.stream().map(Column::toThrift).collect(Collectors.toList()), + scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, + scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, + scanState.readaheadThreshold, null, scanState.batchTimeOut, classLoaderContext, + scanState.executionHints, 0L); + if (waitForWrites) { + serversWaitedForWrites.get(ttype).add(server); + } + + Key.decompress(isr.result.results); + + for (TKeyValue kv : isr.result.results) { + results.put(new Key(kv.key), new Value(kv.value)); + } + + client.closeScan(tinfo, isr.scanID); + + return isr.result.more; + } finally { + ThriftUtil.returnClient(client, context); + } + } catch (TApplicationException tae) { + throw new AccumuloServerException(server, tae); + } catch (TooManyFilesException e) { + log.debug("Tablet ({}) has too many files {} : {}", extent, server, e.getMessage()); + } catch (ThriftSecurityException e) { + log.warn("Security Violation in scan request to {}: {}", server, e.getMessage()); + throw new AccumuloSecurityException(e.user, e.code, e); + } catch (TException e) { + log.debug("Error getting transport to {}: {}", server, e.getMessage()); + } + + throw new AccumuloException("getBatchFromServer: failed"); + } + + public static class ScanState { + + boolean isolated; + TableId tableId; + Text startRow; + boolean skipStartRow; + long readaheadThreshold; + long batchTimeOut; + boolean runOnScanServer; + + Range range; + + int size; + + ClientContext context; + Authorizations authorizations; + List<Column> columns; + + TabletLocation prevLoc; + Long scanID; + + String classLoaderContext; + + boolean finished = false; + + List<IterInfo> serverSideIteratorList; + + Map<String,Map<String,String>> serverSideIteratorOptions; + + SamplerConfiguration samplerConfig; + Map<String,String> executionHints; + + ScanServerAttemptsImpl scanAttempts; + + Duration busyTimeout; + + TabletLocation getErrorLocation() { + return prevLoc; + } + + public ScanState(ClientContext context, TableId tableId, Authorizations authorizations, + Range range, SortedSet<Column> fetchedColumns, int size, + List<IterInfo> serverSideIteratorList, + Map<String,Map<String,String>> serverSideIteratorOptions, boolean isolated, + long readaheadThreshold, SamplerConfiguration samplerConfig, long batchTimeOut, + String classLoaderContext, Map<String,String> executionHints, boolean useScanServer) { + this.context = context; + this.authorizations = authorizations; + this.classLoaderContext = classLoaderContext; + + columns = new ArrayList<>(fetchedColumns.size()); + for (Column column : fetchedColumns) { + columns.add(column); + } + + this.tableId = tableId; + this.range = range; + + Key startKey = range.getStartKey(); + if (startKey == null) { + startKey = new Key(); + } + this.startRow = startKey.getRow(); + + this.skipStartRow = false; + + this.size = size; + + this.serverSideIteratorList = serverSideIteratorList; + this.serverSideIteratorOptions = serverSideIteratorOptions; + + this.isolated = isolated; + this.readaheadThreshold = readaheadThreshold; + + this.samplerConfig = samplerConfig; + + this.batchTimeOut = batchTimeOut; + + if (executionHints == null || executionHints.isEmpty()) { + this.executionHints = null; // avoid thrift serialization for empty map + } else { + this.executionHints = executionHints; + } + + this.runOnScanServer = useScanServer; + + if (useScanServer) { + scanAttempts = new ScanServerAttemptsImpl(); + } + } + } + + public static class ScanTimedOutException extends IOException { + + private static final long serialVersionUID = 1L; + + } + + static long pause(long millis, long maxSleep, boolean runOnScanServer) + throws InterruptedException { + if (!runOnScanServer) { + // the client side scan server plugin controls sleep time... this sleep is for regular scans + // where the scan server plugin does not have control + Thread.sleep(millis); + } + // wait 2 * last time, with +-10% random jitter + return (long) (Math.min(millis * 2, maxSleep) * (.9 + random.nextDouble() / 5)); + } + + public static List<KeyValue> scan(ClientContext context, ScanState scanState, long timeOut) + throws ScanTimedOutException, AccumuloException, AccumuloSecurityException, + TableNotFoundException { + TabletLocation loc = null; + long startTime = System.currentTimeMillis(); + String lastError = null; + String error = null; + int tooManyFilesCount = 0; + long sleepMillis = 100; + final long maxSleepTime = + context.getConfiguration().getTimeInMillis(Property.GENERAL_MAX_SCANNER_RETRY_PERIOD); + + List<KeyValue> results = null; + + Span parent = TraceUtil.startSpan(ThriftScanner.class, "scan"); + try (Scope scope = parent.makeCurrent()) { + while (results == null && !scanState.finished) { + if (Thread.currentThread().isInterrupted()) { + throw new AccumuloException("Thread interrupted"); + } + + if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut) { + throw new ScanTimedOutException(); + } + + while (loc == null) { + long currentTime = System.currentTimeMillis(); + if ((currentTime - startTime) / 1000.0 > timeOut) { + throw new ScanTimedOutException(); + } + + Span child1 = TraceUtil.startSpan(ThriftScanner.class, "scan::locateTablet"); + try (Scope locateSpan = child1.makeCurrent()) { + loc = TabletLocator.getLocator(context, scanState.tableId).locateTablet(context, + scanState.startRow, scanState.skipStartRow, false); + + if (loc == null) { + context.requireNotDeleted(scanState.tableId); + context.requireNotOffline(scanState.tableId, null); + + error = "Failed to locate tablet for table : " + scanState.tableId + " row : " + + scanState.startRow; + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } else { + // when a tablet splits we do want to continue scanning the low child + // of the split if we are already passed it + Range dataRange = loc.tablet_extent.toDataRange(); + + if (scanState.range.getStartKey() != null + && dataRange.afterEndKey(scanState.range.getStartKey())) { + // go to the next tablet + scanState.startRow = loc.tablet_extent.endRow(); + scanState.skipStartRow = true; + loc = null; + } else if (scanState.range.getEndKey() != null + && dataRange.beforeStartKey(scanState.range.getEndKey())) { + // should not happen + throw new RuntimeException("Unexpected tablet, extent : " + loc.tablet_extent + + " range : " + scanState.range + " startRow : " + scanState.startRow); + } + } + } catch (AccumuloServerException e) { + TraceUtil.setException(child1, e, true); + log.debug("Scan failed, server side exception : {}", e.getMessage()); + throw e; + } catch (AccumuloException e) { + error = "exception from tablet loc " + e.getMessage(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + + TraceUtil.setException(child1, e, false); + + lastError = error; + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } finally { + child1.end(); + } + } + + Span child2 = TraceUtil.startSpan(ThriftScanner.class, "scan::location", + Map.of("tserver", loc.tablet_location)); + try (Scope scanLocation = child2.makeCurrent()) { + results = scan(loc, scanState, context); + } catch (AccumuloSecurityException e) { + context.clearTableListCache(); + context.requireNotDeleted(scanState.tableId); + e.setTableInfo(context.getPrintableTableInfoFromId(scanState.tableId)); + TraceUtil.setException(child2, e, true); + throw e; + } catch (TApplicationException tae) { + TraceUtil.setException(child2, tae, true); + throw new AccumuloServerException(scanState.getErrorLocation().tablet_location, tae); + } catch (TSampleNotPresentException tsnpe) { + String message = "Table " + context.getPrintableTableInfoFromId(scanState.tableId) + + " does not have sampling configured or built"; + TraceUtil.setException(child2, tsnpe, true); + throw new SampleNotPresentException(message, tsnpe); + } catch (NotServingTabletException e) { + error = "Scan failed, not serving tablet " + scanState.getErrorLocation(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + + TabletLocator.getLocator(context, scanState.tableId).invalidateCache(loc.tablet_extent); + loc = null; + + // no need to try the current scan id somewhere else + scanState.scanID = null; + + if (scanState.isolated) { + TraceUtil.setException(child2, e, true); + throw new IsolationException(); + } + + TraceUtil.setException(child2, e, false); + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } catch (ScanServerBusyException e) { + error = "Scan failed, scan server was busy " + scanState.getErrorLocation(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + + if (scanState.isolated) { + TraceUtil.setException(child2, e, true); + throw new IsolationException(); + } + + TraceUtil.setException(child2, e, false); + scanState.scanID = null; + } catch (NoSuchScanIDException e) { + error = "Scan failed, no such scan id " + scanState.scanID + " " + + scanState.getErrorLocation(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + + if (scanState.isolated) { + TraceUtil.setException(child2, e, true); + throw new IsolationException(); + } + + TraceUtil.setException(child2, e, false); + scanState.scanID = null; + } catch (TooManyFilesException e) { + error = "Tablet has too many files " + scanState.getErrorLocation() + " retrying..."; + if (error.equals(lastError)) { + tooManyFilesCount++; + if (tooManyFilesCount == 300) { + log.warn("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + } else { + log.debug("{}", error); + tooManyFilesCount = 0; + } + lastError = error; + + // not sure what state the scan session on the server side is + // in after this occurs, so lets be cautious and start a new + // scan session + scanState.scanID = null; + + if (scanState.isolated) { + TraceUtil.setException(child2, e, true); + throw new IsolationException(); + } + + TraceUtil.setException(child2, e, false); + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } catch (TException e) { + TabletLocator.getLocator(context, scanState.tableId).invalidateCache(context, + loc.tablet_location); + error = "Scan failed, thrift error " + e.getClass().getName() + " " + e.getMessage() + + " " + scanState.getErrorLocation(); + if (!error.equals(lastError)) { + log.debug("{}", error); + } else if (log.isTraceEnabled()) { + log.trace("{}", error); + } + lastError = error; + loc = null; + + // do not want to continue using the same scan id, if a timeout occurred could cause a + // batch to be skipped + // because a thread on the server side may still be processing the timed out continue scan + scanState.scanID = null; + + if (scanState.isolated) { + TraceUtil.setException(child2, e, true); + throw new IsolationException(); + } + + TraceUtil.setException(child2, e, false); + sleepMillis = pause(sleepMillis, maxSleepTime, scanState.runOnScanServer); + } finally { + child2.end(); + } + } + + if (results != null && results.isEmpty() && scanState.finished) { + results = null; + } + + return results; + } catch (InterruptedException ex) { + TraceUtil.setException(parent, ex, true); + throw new AccumuloException(ex); + } finally { + parent.end(); + } + } + + private static List<KeyValue> scan(TabletLocation loc, ScanState scanState, ClientContext context) + throws AccumuloSecurityException, NotServingTabletException, TException, + NoSuchScanIDException, TooManyFilesException, TSampleNotPresentException { + if (scanState.finished) { + return null; + } + + if (scanState.runOnScanServer) { + + TabletLocation newLoc; + + var tabletId = new TabletIdImpl(loc.tablet_extent); + + if (scanState.scanID != null && scanState.prevLoc != null + && scanState.prevLoc.tablet_session.equals("scan_server") + && scanState.prevLoc.tablet_extent.equals(loc.tablet_extent)) { + // this is the case of continuing a scan on a scan server for the same tablet, so lets not + // call the scan server selector and just go back to the previous scan server + newLoc = scanState.prevLoc; + log.trace( + "For tablet {} continuing scan on scan server {} without consulting scan server selector, using busyTimeout {}", + loc.tablet_extent, newLoc.tablet_location, scanState.busyTimeout); + } else { + // obtain a snapshot once and only expose this snapshot to the plugin for consistency + var attempts = scanState.scanAttempts.snapshot(); + + var params = new ScanServerSelector.SelectorParameters() { + + @Override + public List<TabletId> getTablets() { + return List.of(tabletId); + } + + @Override + public Collection<? extends ScanServerAttempt> getAttempts(TabletId tabletId) { + return attempts.getOrDefault(tabletId, Set.of()); + } + + @Override + public Map<String,String> getHints() { + if (scanState.executionHints == null) { + return Map.of(); + } + return scanState.executionHints; + } + }; + + ScanServerSelections actions = context.getScanServerSelector().selectServers(params); + + Duration delay = null; + + String scanServer = actions.getScanServer(tabletId); + if (scanServer != null) { + newLoc = new TabletLocation(loc.tablet_extent, scanServer, "scan_server"); + delay = actions.getDelay(); + scanState.busyTimeout = actions.getBusyTimeout(); + log.trace( + "For tablet {} scan server selector chose scan_server:{} delay:{} busyTimeout:{}", + loc.tablet_extent, scanServer, delay, scanState.busyTimeout); + } else { + newLoc = loc; + delay = actions.getDelay(); + scanState.busyTimeout = Duration.ZERO; + log.trace("For tablet {} scan server selector chose tablet_server", loc.tablet_extent); + } + + if (!delay.isZero()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + + var reporter = scanState.scanAttempts.createReporter(newLoc.tablet_location, tabletId); + + try { + return scanRpc(newLoc, scanState, context, scanState.busyTimeout.toMillis()); + } catch (ScanServerBusyException ssbe) { + reporter.report(ScanServerAttempt.Result.BUSY); + throw ssbe; + } catch (Exception e) { + reporter.report(ScanServerAttempt.Result.ERROR); + throw e; + } + } else { + return scanRpc(loc, scanState, context, 0L); + } + } + + private static List<KeyValue> scanRpc(TabletLocation loc, ScanState scanState, + ClientContext context, long busyTimeout) throws AccumuloSecurityException, + NotServingTabletException, TException, NoSuchScanIDException, TooManyFilesException, + TSampleNotPresentException, ScanServerBusyException { + + OpTimer timer = null; + + final TInfo tinfo = TraceUtil.traceInfo(); + + final HostAndPort parsedLocation = HostAndPort.fromString(loc.tablet_location); + TabletScanClientService.Client client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedLocation, context); + + String old = Thread.currentThread().getName(); + try { + ScanResult sr; + + if (scanState.prevLoc != null && !scanState.prevLoc.equals(loc)) { + scanState.scanID = null; + } + + scanState.prevLoc = loc; + + if (scanState.scanID == null) { + Thread.currentThread().setName("Starting scan tserver=" + loc.tablet_location + " tableId=" + + loc.tablet_extent.tableId()); + + if (log.isTraceEnabled()) { + String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + + loc.tablet_extent + " range=" + scanState.range + " ssil=" + + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions + + " context=" + scanState.classLoaderContext; + log.trace("tid={} {}", Thread.currentThread().getId(), msg); + timer = new OpTimer().start(); + } + + TabletType ttype = TabletType.type(loc.tablet_extent); + boolean waitForWrites = !serversWaitedForWrites.get(ttype).contains(loc.tablet_location); + + InitialScan is = client.startScan(tinfo, scanState.context.rpcCreds(), + loc.tablet_extent.toThrift(), scanState.range.toThrift(), + scanState.columns.stream().map(Column::toThrift).collect(Collectors.toList()), + scanState.size, scanState.serverSideIteratorList, scanState.serverSideIteratorOptions, + scanState.authorizations.getAuthorizationsBB(), waitForWrites, scanState.isolated, + scanState.readaheadThreshold, + SamplerConfigurationImpl.toThrift(scanState.samplerConfig), scanState.batchTimeOut, + scanState.classLoaderContext, scanState.executionHints, busyTimeout); + if (waitForWrites) { + serversWaitedForWrites.get(ttype).add(loc.tablet_location); + } + + sr = is.result; + + if (sr.more) { + scanState.scanID = is.scanID; + } else { + client.closeScan(tinfo, is.scanID); + } + + } else { + // log.debug("Calling continue scan : "+scanState.range+" loc = "+loc); + String msg = + "Continuing scan tserver=" + loc.tablet_location + " scanid=" + scanState.scanID; + Thread.currentThread().setName(msg); + + if (log.isTraceEnabled()) { + log.trace("tid={} {}", Thread.currentThread().getId(), msg); + timer = new OpTimer().start(); + } + + sr = client.continueScan(tinfo, scanState.scanID, busyTimeout); + if (!sr.more) { + client.closeScan(tinfo, scanState.scanID); + scanState.scanID = null; + } + } + + if (sr.more) { + if (timer != null) { + timer.stop(); + log.trace("tid={} Finished scan in {} #results={} scanid={}", + Thread.currentThread().getId(), String.format("%.3f secs", timer.scale(SECONDS)), + sr.results.size(), scanState.scanID); + } + } else { + // log.debug("No more : tab end row = "+loc.tablet_extent.getEndRow()+" range = + // "+scanState.range); + if (loc.tablet_extent.endRow() == null) { + scanState.finished = true; + + if (timer != null) { + timer.stop(); + log.trace("tid={} Completely finished scan in {} #results={}", + Thread.currentThread().getId(), String.format("%.3f secs", timer.scale(SECONDS)), + sr.results.size()); + } + + } else if (scanState.range.getEndKey() == null || !scanState.range + .afterEndKey(new Key(loc.tablet_extent.endRow()).followingKey(PartialKey.ROW))) { + scanState.startRow = loc.tablet_extent.endRow(); + scanState.skipStartRow = true; + + if (timer != null) { + timer.stop(); + log.trace("tid={} Finished scanning tablet in {} #results={}", + Thread.currentThread().getId(), String.format("%.3f secs", timer.scale(SECONDS)), + sr.results.size()); + } + } else { + scanState.finished = true; + if (timer != null) { + timer.stop(); + log.trace("tid={} Completely finished in {} #results={}", + Thread.currentThread().getId(), String.format("%.3f secs", timer.scale(SECONDS)), + sr.results.size()); + } + } + } + + Key.decompress(sr.results); + + if (!sr.results.isEmpty() && !scanState.finished) { + scanState.range = new Range(new Key(sr.results.get(sr.results.size() - 1).key), false, + scanState.range.getEndKey(), scanState.range.isEndKeyInclusive()); + } + + List<KeyValue> results = new ArrayList<>(sr.results.size()); + for (TKeyValue tkv : sr.results) { + results.add(new KeyValue(new Key(tkv.key), tkv.value)); + } + + return results; + + } catch (ThriftSecurityException e) { + throw new AccumuloSecurityException(e.user, e.code, e); + } finally { + ThriftUtil.returnClient(client, context); + Thread.currentThread().setName(old); + } + } + + static void close(ScanState scanState) { + if (!scanState.finished && scanState.scanID != null && scanState.prevLoc != null) { + TInfo tinfo = TraceUtil.traceInfo(); + + log.debug("Closing active scan {} {}", scanState.prevLoc, scanState.scanID); + HostAndPort parsedLocation = HostAndPort.fromString(scanState.prevLoc.tablet_location); + TabletScanClientService.Client client = null; + try { + client = + ThriftUtil.getClient(ThriftClientTypes.TABLET_SCAN, parsedLocation, scanState.context); + client.closeScan(tinfo, scanState.scanID); + } catch (TException e) { + // ignore this is a best effort + log.debug("Failed to close active scan " + scanState.prevLoc + " " + scanState.scanID, e); + } finally { + if (client != null) { + ThriftUtil.returnClient(client, scanState.context); + } + } + } + } +}