keith-turner commented on code in PR #3091:
URL: https://github.com/apache/accumulo/pull/3091#discussion_r1031344021
##########
test/src/main/java/org/apache/accumulo/test/ScanServerIT.java:
##########
@@ -148,16 +212,56 @@ public void testBatchScan() throws Exception {
}
@Test
- public void testScanOfflineTable() throws Exception {
+ public void testBatchScanTableCreatedOffline() throws Exception {
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+ client.tableOperations().create(tableName, new
NewTableConfiguration().createOffline());
+
+ try (BatchScanner scanner = client.createBatchScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRanges(Collections.singletonList(new Range()));
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ } // when the scanner is closed, all open sessions should be closed
Review Comment:
```suggestion
assertEquals(0, Iterables.size(scanner), "Unexpected entries seen
when scanning empty table");
} // when the scanner is closed, all open sessions should be closed
assertFalse(client.tableOperations().isOnline(tableName));
```
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java:
##########
@@ -241,38 +247,59 @@ private void binRanges(TabletLocator tabletLocator,
List<Range> ranges,
int lastFailureSize = Integer.MAX_VALUE;
- while (true) {
+ if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL) &&
!this.tableOnline) {
+ // The TabletLocator only caches the location of tablets for online
tables. The ScanServer
+ // can scan online and offline tables, but does not require the location
of the tablet. It
+ // only requires the KeyExtent. Find the first extent for the table that
matches the
+ // startRow provided in the scanState. If no startRow is provided, use
the first KeyExtent
+ // for the table
binnedRanges.clear();
- List<Range> failures = tabletLocator.binRanges(context, ranges,
binnedRanges);
-
- if (failures.isEmpty()) {
- break;
- } else {
- // tried to only do table state checks when failures.size() ==
ranges.size(), however this
- // did
- // not work because nothing ever invalidated entries in the
tabletLocator cache... so even
- // though
- // the table was deleted the tablet locator entries for the deleted
table were not
- // cleared... so
- // need to always do the check when failures occur
- if (failures.size() >= lastFailureSize) {
- context.requireNotDeleted(tableId);
- context.requireNotOffline(tableId, tableName);
+ Map<KeyExtent,List<Range>> map = new HashMap<>();
+ ranges.forEach(r -> {
+ TabletsMetadata tm =
context.getAmple().readTablets().forTable(this.tableId)
+ .overlapping(r.getStartKey() == null ? null :
r.getStartKey().getRow(),
+ r.isStartKeyInclusive(), r.getEndKey() == null ? null :
r.getEndKey().getRow())
+ .build();
Review Comment:
Always doing a metadata lookup per range for the batch scanner could be a
performance problem. Would be better to have some sort of caching. Can cache
previously seen extents as long as the cache is invalidated when scan server
reports back that extent does not exists.
There may be some code from the bulk import code that we could use, for the
case when bulk import works with an offline table. I will look around for that.
##########
core/src/main/java/org/apache/accumulo/core/clientImpl/ThriftScanner.java:
##########
@@ -288,63 +290,88 @@ public static List<KeyValue> scan(ClientContext context,
ScanState scanState, lo
if ((System.currentTimeMillis() - startTime) / 1000.0 > timeOut)
throw new ScanTimedOutException();
- while (loc == null) {
- long currentTime = System.currentTimeMillis();
- if ((currentTime - startTime) / 1000.0 > timeOut)
- throw new ScanTimedOutException();
-
- Span child1 = TraceUtil.startSpan(ThriftScanner.class,
"scan::locateTablet");
- try (Scope locateSpan = child1.makeCurrent()) {
- loc = TabletLocator.getLocator(context,
scanState.tableId).locateTablet(context,
- scanState.startRow, scanState.skipStartRow, false);
-
- if (loc == null) {
- context.requireNotDeleted(scanState.tableId);
- context.requireNotOffline(scanState.tableId, null);
-
- error = "Failed to locate tablet for table : " +
scanState.tableId + " row : "
- + scanState.startRow;
+ boolean tableOnline =
+
context.tableOperations().isOnline(context.getTableName(scanState.tableId));
+
+ if (scanState.runOnScanServer && !tableOnline) {
+ // The TabletLocator only caches the location of tablets for online
tables. The ScanServer
+ // can scan online and offline tables, but does not require the
location of the tablet. It
+ // only requires the KeyExtent. Find the first extent for the table
that matches the
+ // startRow provided in the scanState. If no startRow is provided,
use the first KeyExtent
+ // for the table
+ Text endRow = scanState.range != null && scanState.range.getEndKey()
!= null
+ ? scanState.range.getEndKey().getRow() : null;
+ TabletsMetadata tm =
context.getAmple().readTablets().forTable(scanState.tableId)
Review Comment:
Would be better to cache extents.
##########
test/src/main/java/org/apache/accumulo/test/ScanServerIT.java:
##########
@@ -123,21 +127,81 @@ public void testScan() throws Exception {
}
}
+ @Test
+ public void testScanOfflineSingleTablet() throws Exception {
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+
+ final int ingestedEntryCount =
+ createTableAndIngest(client, tableName, null, 10, 10, "colf", false);
+ client.tableOperations().offline(tableName, true);
+ assertFalse(client.tableOperations().isOnline(tableName));
+
+ TabletLocator.getLocator((ClientContext) client,
+
TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+ try (Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(new Range());
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed
entries");
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
+ @Test
+ public void testScanOfflineTableManyTablets() throws Exception {
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+
+ final int ingestedEntryCount =
+ createTableAndIngest(client, tableName, null, 10, 10, "colf", true);
+ client.tableOperations().offline(tableName, true);
+ assertFalse(client.tableOperations().isOnline(tableName));
+
+ TabletLocator.getLocator((ClientContext) client,
+
TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache();
+
+ try (Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(new Range());
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ assertEquals(ingestedEntryCount, Iterables.size(scanner),
+ "The scan server scanner should have seen all ingested and flushed
entries");
+ } // when the scanner is closed, all open sessions should be closed
+ }
+ }
+
+ @Test
+ public void testScanTableCreatedOffline() throws Exception {
+
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ String tableName = getUniqueNames(1)[0];
+ client.tableOperations().create(tableName, new
NewTableConfiguration().createOffline());
+ try (Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.setRange(new Range());
+ scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL);
+ } // when the scanner is closed, all open sessions should be closed
Review Comment:
```suggestion
assertEquals(0, Iterables.size(scanner), "Unexpected entries seen
when scanning empty table");
} // when the scanner is closed, all open sessions should be closed
assertFalse(client.tableOperations().isOnline(tableName));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]