This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push: new c14b656 PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes c14b656 is described below commit c14b656e183a450f034b9037e25a93e349674ffc Author: Kadir <kozde...@salesforce.com> AuthorDate: Wed Nov 6 22:04:20 2019 -0800 PHOENIX-5562 Simplify detection of concurrent updates on data tables with indexes --- .../phoenix/hbase/index/IndexRegionObserver.java | 51 ++++++++-------------- 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index e8d9a05..b058b33 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -104,19 +104,12 @@ public class IndexRegionObserver extends BaseRegionObserver { * Class to represent pending data table rows */ private static class PendingRow { - private long latestTimestamp; - private long count; + private boolean concurrent = false; + private long count = 1; - PendingRow(long latestTimestamp) { - count = 1; - this.latestTimestamp = latestTimestamp; - } - - public void add(long timestamp) { + public void add() { count++; - if (latestTimestamp < timestamp) { - latestTimestamp = timestamp; - } + concurrent = true; } public void remove() { @@ -127,8 +120,8 @@ public class IndexRegionObserver extends BaseRegionObserver { return count; } - public long getLatestTimestamp() { - return latestTimestamp; + public boolean isConcurrent() { + return concurrent; } } @@ -159,10 +152,6 @@ public class IndexRegionObserver extends BaseRegionObserver { // The collection of candidate index mutations that will be applied after the data table mutations private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates; private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE); - // The set of row keys for the data table rows of this batch such that for each of these rows there exists another - // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the - // row (i.e., concurrent updates). - private HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>(); private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>(); long dataWriteStartTime; @@ -401,16 +390,15 @@ public class IndexRegionObserver extends BaseRegionObserver { } } - private void populatePendingRows(BatchMutateContext context, long now) { + private void populatePendingRows(BatchMutateContext context) { for (RowLock rowLock : context.rowLocks) { ImmutableBytesPtr rowKey = rowLock.getRowKey(); PendingRow pendingRow = pendingRows.get(rowKey); if (pendingRow == null) { - pendingRows.put(rowKey, new PendingRow(now)); + pendingRows.put(rowKey, new PendingRow()); } else { // m is a mutation on a row that has already a pending mutation in progress from another batch - pendingRow.add(now); - context.pendingRows.add(rowKey); + pendingRow.add(); } } } @@ -579,17 +567,12 @@ public class IndexRegionObserver extends BaseRegionObserver { Put unverifiedPut = new Put(m.getRow()); unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES); context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond())); - // Ignore post index updates (i.e., the third write phase updates) for this row if it is - // going through concurrent updates - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond()); - if (!context.pendingRows.contains(rowKey)) { - if (m instanceof Put) { - // Remove the empty column prepared by Index codec as we need to change its value - removeEmptyColumn(m, emptyCF, emptyCQ); - ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); - } - context.intermediatePostIndexUpdates.add(next); + if (m instanceof Put) { + // Remove the empty column prepared by Index codec as we need to change its value + removeEmptyColumn(m, emptyCF, emptyCQ); + ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); } + context.intermediatePostIndexUpdates.add(next); } } } @@ -639,7 +622,7 @@ public class IndexRegionObserver extends BaseRegionObserver { // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect // concurrent updates if (replayWrite == null) { - populatePendingRows(context, now); + populatePendingRows(context); } // First group all the updates for a single row into a single update to be processed Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite); @@ -682,9 +665,9 @@ public class IndexRegionObserver extends BaseRegionObserver { Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next(); ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond()); PendingRow pendingRow = pendingRows.get(rowKey); - // Has any concurrent mutation arrived for the same row? if so, skip post index updates + // Are there concurrent updates on the data table row? if so, skip post index updates // and let read repair resolve conflicts - if (pendingRow.getLatestTimestamp() > now) { + if (pendingRow.isConcurrent()) { iterator.remove(); } }