This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 259b560548 PHOENIX-7328 Fix flapping ConcurrentMutationsExtendedIT#testConcurren… (#1903) 259b560548 is described below commit 259b560548ed6f7a5e04909f2af9f39a3cf2ee58 Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com> AuthorDate: Fri Jun 14 11:04:28 2024 +0300 PHOENIX-7328 Fix flapping ConcurrentMutationsExtendedIT#testConcurren… (#1903) --- .../phoenix/hbase/index/IndexRegionObserver.java | 200 ++++++++++++--------- .../end2end/ConcurrentMutationsExtendedIT.java | 16 +- 2 files changed, 120 insertions(+), 96 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index a6f682da22..a4547b60c9 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.index.PhoenixIndexBuilderHelper; +import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions; import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap; import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; @@ -150,36 +151,51 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private static final OperationStatus NOWRITE = new OperationStatus(OperationStatusCode.SUCCESS); public static final String PHOENIX_APPEND_METADATA_TO_WAL = "phoenix.append.metadata.to.wal"; public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false; + /** + * Class to represent pending data table rows + * */ + private class PendingRow { + private int count; + private boolean usable; + private ImmutableBytesPtr rowKey; + private BatchMutateContext lastContext; + + PendingRow(ImmutableBytesPtr rowKey, BatchMutateContext context) { + count = 1; + usable = true; + lastContext = context; + this.rowKey = rowKey; + } - /** - * Class to represent pending data table rows - */ - private static class PendingRow { - private int count; - private BatchMutateContext lastContext; - - PendingRow(BatchMutateContext context) { - count = 1; - lastContext = context; - } - - public void add(BatchMutateContext context) { - count++; - lastContext = context; - } + public boolean add(BatchMutateContext context) { + synchronized (this) { + if (usable) { + count++; + lastContext = context; + return true; + } + } + return false; + } - public void remove() { - count--; - } + public void remove() { + synchronized (this) { + count--; + if (count == 0) { + pendingRows.remove(rowKey); + usable = false; + } + } + } - public int getCount() { + public int getCount() { return count; } - public BatchMutateContext getLastContext() { + public BatchMutateContext getLastContext() { return lastContext; } - } + } private static boolean ignoreIndexRebuildForTesting = false; private static boolean failPreIndexUpdatesForTesting = false; @@ -276,6 +292,9 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { public CountDownLatch getCountDownLatch() { synchronized (this) { + if (currentPhase != BatchMutatePhase.PRE) { + return null; + } if (waitList == null) { waitList = new ArrayList<>(); } @@ -574,19 +593,6 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { context.rowLocks.clear(); } - private void populatePendingRows(BatchMutateContext context) { - for (RowLock rowLock : context.rowLocks) { - ImmutableBytesPtr rowKey = rowLock.getRowKey(); - PendingRow pendingRow = pendingRows.get(rowKey); - if (pendingRow == null) { - pendingRows.put(rowKey, new PendingRow(context)); - } else { - // m is a mutation on a row that has already a pending mutation in progress from another batch - pendingRow.add(context); - } - } - } - private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException { context.multiMutationMap = new HashMap<>(); @@ -837,28 +843,49 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { return false; } /** - * Retrieve the last committed data row state. + * Retrieve the data row state either from memory or disk. The rows are locked by the caller. */ private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context) throws IOException { Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size()); for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) { - PendingRow pendingRow = pendingRows.get(rowKeyPtr); - if (pendingRow != null && pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) { - if (context.lastConcurrentBatchContext == null) { - context.lastConcurrentBatchContext = new HashMap<>(); - } - context.lastConcurrentBatchContext.put(rowKeyPtr, pendingRow.getLastContext()); - if (context.maxPendingRowCount < pendingRow.getCount()) { - context.maxPendingRowCount = pendingRow.getCount(); - } - Put put = pendingRow.getLastContext().getNextDataRowState(rowKeyPtr); - if (put != null) { - context.dataRowStates.put(rowKeyPtr, new Pair<Put, Put>(put, new Put(put))); - } - } - else { + PendingRow pendingRow = new PendingRow(rowKeyPtr, context); + // Add the data table rows in the mini batch to the per region collection of pending + // rows. This will be used to detect concurrent updates + PendingRow existingPendingRow = pendingRows.putIfAbsent(rowKeyPtr, pendingRow); + if (existingPendingRow == null) { + // There was no pending row for this row key. We need to retrieve this row from disk keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC)); + } else { + // There is a pending row for this row key. We need to retrieve the row from memory + BatchMutateContext lastContext = existingPendingRow.getLastContext(); + if (existingPendingRow.add(context)) { + BatchMutatePhase phase = lastContext.getCurrentPhase(); + Preconditions.checkArgument(phase != BatchMutatePhase.POST, + "the phase of the last batch cannot be POST"); + if (phase == BatchMutatePhase.PRE) { + if (context.lastConcurrentBatchContext == null) { + context.lastConcurrentBatchContext = new HashMap<>(); + } + context.lastConcurrentBatchContext.put(rowKeyPtr, lastContext); + if (context.maxPendingRowCount < existingPendingRow.getCount()) { + context.maxPendingRowCount = existingPendingRow.getCount(); + } + Put put = lastContext.getNextDataRowState(rowKeyPtr); + if (put != null) { + context.dataRowStates.put(rowKeyPtr, new Pair<>(put, new Put(put))); + } + } else { + // The last batch for this row key failed. We cannot use the memory state. + // So we need to retrieve this row from disk + keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC)); + } + } else { + // The existing pending row is removed from the map. That means there is no + // pending row for this row key anymore. We need to add the new one to the map + pendingRows.put(rowKeyPtr, pendingRow); + keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC)); + } } } if (keys.isEmpty()) { @@ -1119,41 +1146,51 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } + /** + * Wait for the previous batches to complete. If any of the previous batch fails then this + * batch will fail too and needs to be retried. The rows are locked by the caller. + * @param table + * @param context + * @throws Throwable + */ private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context) throws Throwable { - boolean done = true; for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) { BatchMutatePhase phase = lastContext.getCurrentPhase(); - - if (phase == BatchMutatePhase.PRE) { + if (phase == BatchMutatePhase.FAILED) { + context.currentPhase = BatchMutatePhase.FAILED; + break; + } else if (phase == BatchMutatePhase.PRE) { CountDownLatch countDownLatch = lastContext.getCountDownLatch(); + if (countDownLatch == null) { + // phase changed from PRE to either FAILED or POST + if (phase == BatchMutatePhase.FAILED) { + context.currentPhase = BatchMutatePhase.FAILED; + break; + } + continue; + } // Release the locks so that the previous concurrent mutation can go into the post phase unlockRows(context); // Wait for at most one concurrentMutationWaitDuration for each level in the dependency tree of batches. // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration, TimeUnit.MILLISECONDS)) { + context.currentPhase = BatchMutatePhase.FAILED; LOG.debug(String.format("latch timeout context %s last %s", context, lastContext)); - done = false; + break; } - // Acquire the locks again before letting the region proceed with data table updates - lockRows(context); - if (!done) { - // previous concurrent batch did not complete so we have to retry this batch + if (lastContext.getCurrentPhase() == BatchMutatePhase.FAILED) { + context.currentPhase = BatchMutatePhase.FAILED; break; - } else { - // read the phase again to determine the status of previous batch - phase = lastContext.getCurrentPhase(); - LOG.debug(String.format("context %s last %s exit phase %s", context, lastContext, phase)); } - } - - if (phase == BatchMutatePhase.FAILED) { - done = false; - break; + // Acquire the locks again before letting the region proceed with data table updates + lockRows(context); + LOG.debug(String.format("context %s last %s exit phase %s", context, lastContext, + lastContext.getCurrentPhase())); } } - if (!done) { + if (context.currentPhase == BatchMutatePhase.FAILED) { // This batch needs to be retried since one of the previous concurrent batches has not completed yet. // Throwing an IOException will result in retries of this batch. Removal of reference counts and // locks for the rows of this batch will be done in postBatchMutateIndispensably() @@ -1225,9 +1262,6 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { if (context.hasGlobalIndex || context.hasUncoveredIndex || context.hasTransform) { // Prepare next data rows states for pending mutations (for global indexes) prepareDataRowStates(c, miniBatchOp, context, now); - // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect - // concurrent updates - populatePendingRows(context); // early exit if it turns out we don't have any edits long start = EnvironmentEdgeManager.currentTimeMillis(); preparePreIndexMutations(context, now, indexMetaData); @@ -1282,6 +1316,10 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { RowLock rowLock = rowLockIterator.next(); ImmutableBytesPtr rowKey = rowLock.getRowKey(); if (row.equals(rowKey)) { + PendingRow pendingRow = pendingRows.get(rowKey); + if (pendingRow != null) { + pendingRow.remove(); + } rowLock.release(); rowLockIterator.remove(); context.rowsToLock.remove(row); @@ -1379,7 +1417,6 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { doIndexWritesWithExceptions(context, true); metricSource.updatePostIndexUpdateTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); - return; } catch (Throwable e) { metricSource.updatePostIndexUpdateFailureTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); @@ -1413,24 +1450,10 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } private void removePendingRows(BatchMutateContext context) { - if (context.currentPhase == BatchMutatePhase.FAILED) { - // This batch failed. All concurrent batches will fail too. So we can remove - // all rows of this batch from the memory as the in-memory row images are not valid - // anymore. Please note that when a batch fails, some of the rows may not have been - // locked and so it is not safe to update the pending row entries in that case. - for (ImmutableBytesPtr rowKey : context.rowsToLock) { - pendingRows.remove(rowKey); - } - return; - } - for (RowLock rowLock : context.rowLocks) { - ImmutableBytesPtr rowKey = rowLock.getRowKey(); + for (ImmutableBytesPtr rowKey : context.rowsToLock) { PendingRow pendingRow = pendingRows.get(rowKey); if (pendingRow != null) { pendingRow.remove(); - if (pendingRow.getCount() == 0) { - pendingRows.remove(rowKey); - } } } } @@ -1445,7 +1468,6 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { doIndexWritesWithExceptions(context, false); metricSource.updatePreIndexUpdateTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); - return; } catch (Throwable e) { metricSource.updatePreIndexUpdateFailureTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 95639d284a..278ccb05b5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -87,14 +87,17 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { } @BeforeClass public static synchronized void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + Map<String, String> props = Maps.newHashMapWithExpectedSize(4); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); - // The following sets the row lock wait duration to 10 ms to test the code path handling + // The following sets the row lock wait duration to 100 ms to test the code path handling // row lock timeouts. When there are concurrent mutations, the wait time can be - // much longer than 10 ms. - props.put("hbase.rowlock.wait.duration", "10"); + // much longer than 100 ms + props.put("hbase.rowlock.wait.duration", "100"); + // The following sets the wait duration for the previous concurrent batch to 10 ms to test + // the code path handling timeouts + props.put("phoenix.index.concurrent.wait.duration.ms", "10"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @Parameterized.Parameters( @@ -308,8 +311,8 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { @Test public void testConcurrentUpserts() throws Exception { int nThreads = 10; - final int batchSize = 20; - final int nRows = 100; + final int batchSize = 100; + final int nRows = 499; final int nIndexValues = 23; final String tableName = generateUniqueName(); final String indexName = generateUniqueName(); @@ -341,7 +344,6 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { conn.commit(); } catch (SQLException e) { System.out.println(e); - throw new RuntimeException(e); } finally { doneSignal.countDown(); }