This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new 501ae7ee3c PHOENIX-7379 Improve handling of concurrent index mutations with the … (#1951) 501ae7ee3c is described below commit 501ae7ee3c66e4c6dc08374938e6595e355197dc Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com> AuthorDate: Wed Aug 14 23:23:37 2024 -0700 PHOENIX-7379 Improve handling of concurrent index mutations with the … (#1951) --- .../phoenix/hbase/index/IndexRegionObserver.java | 87 ++++++++++++++++------ .../end2end/ConcurrentMutationsExtendedIT.java | 19 +++-- 2 files changed, 77 insertions(+), 29 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 317841fb13..b2e3b5ff9d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; @@ -366,7 +365,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL; private boolean isNamespaceEnabled = false; private boolean useBloomFilter = false; - + private long lastTimestamp = 0; + private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new ArrayList<>(); private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000; private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100; @@ -1044,7 +1044,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { * either set to "verified" or the row is deleted. */ private void preparePreIndexMutations(BatchMutateContext context, - long now, + long batchTimestamp, PhoenixIndexMetaData indexMetaData) throws Throwable { List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); // get the current span, or just use a null-span to avoid a bunch of if statements @@ -1056,7 +1056,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { current.addTimelineAnnotation("Built index updates, doing preStep"); // The rest of this method is for handling global index updates context.indexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create(); - prepareIndexMutations(context, maintainers, now); + prepareIndexMutations(context, maintainers, batchTimestamp); context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); int updateCount = 0; @@ -1076,7 +1076,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { // Set the status of the index row to "unverified" Put unverifiedPut = new Put(m.getRow()); unverifiedPut.addColumn( - emptyCF, emptyCQ, now, QueryConstants.UNVERIFIED_BYTES); + emptyCF, emptyCQ, batchTimestamp, QueryConstants.UNVERIFIED_BYTES); // This will be done before the data table row is updated (i.e., in the first write phase) context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut); } @@ -1100,7 +1100,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } private void preparePostIndexMutations(BatchMutateContext context, - long now, + long batchTimestamp, PhoenixIndexMetaData indexMetaData) { context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create(); List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers(); @@ -1116,7 +1116,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { if (!indexMaintainer.isUncovered()) { Put verifiedPut = new Put(m.getRow()); // Set the status of the index row to "verified" - verifiedPut.addColumn(emptyCF, emptyCQ, now, QueryConstants.VERIFIED_BYTES); + verifiedPut.addColumn(emptyCF, emptyCQ, batchTimestamp, + QueryConstants.VERIFIED_BYTES); context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut); } } else { @@ -1211,6 +1212,53 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } + private boolean shouldSleep(BatchMutateContext context) { + for (ImmutableBytesPtr ptr : context.rowsToLock) { + for (Set set : batchesWithLastTimestamp) { + if (set.contains(ptr)) { + return true; + } + } + } + return false; + } + private long getBatchTimestamp(BatchMutateContext context, TableName table) + throws InterruptedException { + synchronized (this) { + long ts = EnvironmentEdgeManager.currentTimeMillis(); + if (ts != lastTimestamp) { + // The timestamp for this batch will be different from the last batch processed. + lastTimestamp = ts; + batchesWithLastTimestamp.clear(); + batchesWithLastTimestamp.add(context.rowsToLock); + return ts; + } else { + if (!shouldSleep(context)) { + // There is no need to sleep as the last batches with the same timestamp + // do not have a common row this batch + batchesWithLastTimestamp.add(context.rowsToLock); + return ts; + } + } + } + // Sleep for one millisecond. The sleep is necessary to get different timestamps + // for concurrent batches that share common rows. + Thread.sleep(1); + LOG.debug("slept 1ms for " + table.getNameAsString()); + synchronized (this) { + long ts = EnvironmentEdgeManager.currentTimeMillis(); + if (ts != lastTimestamp) { + // The timestamp for this batch will be different from the last batch processed. + lastTimestamp = ts; + batchesWithLastTimestamp.clear(); + } + // We do not have to check again if we need to sleep again since we got the next + // timestamp while holding the row locks. This mean there cannot be a new + // mutation with the same row attempting get the same timestamp + batchesWithLastTimestamp.add(context.rowsToLock); + return ts; + } + } public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable { PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp); @@ -1226,7 +1274,8 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { ServerIndexUtil.setDeleteAttributes(miniBatchOp); } - // Exclusively lock all rows to do consistent writes over multiple tables (i.e., the data and its index tables) + // Exclusively lock all rows to do consistent writes over multiple tables + // (i.e., the data and its index tables) populateRowsToLock(miniBatchOp, context); // early exit if it turns out we don't have any update for indexes if (context.rowsToLock.isEmpty()) { @@ -1265,27 +1314,19 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } - long now = EnvironmentEdgeManager.currentTimeMillis(); - // Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index - // inconsistencies as this case isn't handled correctly currently). - setTimestamps(miniBatchOp, builder, now); - TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable(); + long batchTimestamp = getBatchTimestamp(context, table); + // Update the timestamps of the data table mutations to prevent overlapping timestamps + // (which prevents index inconsistencies as this case is not handled). + setTimestamps(miniBatchOp, builder, batchTimestamp); if (context.hasGlobalIndex || context.hasUncoveredIndex || context.hasTransform) { // Prepare next data rows states for pending mutations (for global indexes) - prepareDataRowStates(c, miniBatchOp, context, now); + prepareDataRowStates(c, miniBatchOp, context, batchTimestamp); // early exit if it turns out we don't have any edits long start = EnvironmentEdgeManager.currentTimeMillis(); - preparePreIndexMutations(context, now, indexMetaData); + preparePreIndexMutations(context, batchTimestamp, indexMetaData); metricSource.updateIndexPrepareTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); - // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to - // get different timestamps for concurrent batches that share common rows. It is very rare that the index updates - // can be prepared in less than one millisecond - if (!context.rowLocks.isEmpty() && now == EnvironmentEdgeManager.currentTimeMillis()) { - Thread.sleep(1); - LOG.debug("slept 1ms for " + table.getNameAsString()); - } // Release the locks before making RPC calls for index updates unlockRows(context); // Do the first phase index updates @@ -1295,7 +1336,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { if (context.lastConcurrentBatchContext != null) { waitForPreviousConcurrentBatch(table, context); } - preparePostIndexMutations(context, now, indexMetaData); + preparePostIndexMutations(context, batchTimestamp, indexMetaData); } if (context.hasLocalIndex) { // Group all the updates for a single row into a single update to be processed (for local indexes) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 278ccb05b5..12dd00fd15 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -19,7 +19,6 @@ package org.apache.phoenix.end2end; import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants; import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository; -import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -42,6 +41,8 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; @@ -68,12 +69,15 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; @Category(NeedsOwnMiniClusterTest.class) @RunWith(Parameterized.class) public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { - + private static final Logger LOGGER = + LoggerFactory.getLogger(ConcurrentMutationsExtendedIT.class); private final boolean uncovered; private static final Random RAND = new Random(5); private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_"; @@ -111,7 +115,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { // This checks the state of every raw index row without rebuilding any row IndexTool indexTool = IndexToolIT.runIndexTool(false, "", tableName, indexName, null, 0, IndexTool.IndexVerifyType.ONLY); - System.out.println(indexTool.getJob().getCounters()); + LOGGER.info(indexTool.getJob().getCounters().toString()); TestUtil.dumpTable(conn, TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME)); assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); @@ -126,7 +130,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { // We want to check the index rows again as they may be modified by the read repair indexTool = IndexToolIT.runIndexTool(false, "", tableName, indexName, null, 0, IndexTool.IndexVerifyType.ONLY); - System.out.println(indexTool.getJob().getCounters()); + LOGGER.info(indexTool.getJob().getCounters().toString()); assertEquals(0, indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue()); assertEquals(0, indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue()); @@ -324,6 +328,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { + tableName + "(v1)" + (uncovered ? "" : "INCLUDE(v2, v3)")); final CountDownLatch doneSignal = new CountDownLatch(nThreads); Runnable[] runnables = new Runnable[nThreads]; + long startTime = EnvironmentEdgeManager.currentTimeMillis(); for (int i = 0; i < nThreads; i++) { runnables[i] = new Runnable() { @@ -343,7 +348,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { } conn.commit(); } catch (SQLException e) { - System.out.println(e); + LOGGER.warn("Exception during upsert : " + e); } finally { doneSignal.countDown(); } @@ -357,6 +362,8 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { } assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS)); + LOGGER.info("Total upsert time in ms : " + + (EnvironmentEdgeManager.currentTimeMillis() - startTime)); long actualRowCount = verifyIndexTable(tableName, indexName, conn); assertEquals(nRows, actualRowCount); }