This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 49bfb36755 PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1887) 49bfb36755 is described below commit 49bfb36755ace643fbc6a42415a6fe64e3984719 Author: kadirozde <37155482+kadiro...@users.noreply.github.com> AuthorDate: Mon May 6 09:16:18 2024 -0700 PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1887) --- .../end2end/ConcurrentMutationsExtendedIT.java | 13 ++-- .../phoenix/hbase/index/IndexRegionObserver.java | 82 ++++++++++++++-------- 2 files changed, 60 insertions(+), 35 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 48768ead05..2d7eb3767a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -76,10 +76,14 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { @BeforeClass public static synchronized void doSetup() throws Exception { - Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + Map<String, String> props = Maps.newHashMapWithExpectedSize(3); props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + // The following sets the row lock wait duration to 10 ms to test the code path handling + // row lock timeouts. When there are concurrent mutations, the wait time can be + // much longer than 10 ms. + props.put("hbase.rowlock.wait.duration", "10"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -280,9 +284,9 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { @Test public void testConcurrentUpserts() throws Exception { - int nThreads = 4; - final int batchSize = 200; - final int nRows = 51; + int nThreads = 10; + final int batchSize = 20; + final int nRows = 100; final int nIndexValues = 23; final String tableName = generateUniqueName(); final String indexName = generateUniqueName(); @@ -312,6 +316,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { } conn.commit(); } catch (SQLException e) { + System.out.println(e); throw new RuntimeException(e); } finally { doneSignal.countDown(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index f1f03d12af..d6666d191f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -189,7 +189,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re */ public static class BatchMutateContext { - private BatchMutatePhase currentPhase = BatchMutatePhase.PRE; + private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE; // The max of reference counts on the pending rows of this batch at the time this batch arrives private int maxPendingRowCount = 0; private final int clientVersion; @@ -246,12 +246,24 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re } public CountDownLatch getCountDownLatch() { - if (waitList == null) { - waitList = new ArrayList<>(); + synchronized (this) { + if (waitList == null) { + waitList = new ArrayList<>(); + } + CountDownLatch countDownLatch = new CountDownLatch(1); + waitList.add(countDownLatch); + return countDownLatch; + } + } + + public void countDownAllLatches() { + synchronized (this) { + if (waitList != null) { + for (CountDownLatch countDownLatch : waitList) { + countDownLatch.countDown(); + } + } } - CountDownLatch countDownLatch = new CountDownLatch(1); - waitList.add(countDownLatch); - return countDownLatch; } public int getMaxPendingRowCount() { @@ -898,8 +910,6 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re } } } - removePendingRows(context); - context.indexUpdates.clear(); } private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) { @@ -922,11 +932,9 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context) throws Throwable { - boolean done; - BatchMutatePhase phase; - done = true; + boolean done = true; for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) { - phase = lastContext.getCurrentPhase(); + BatchMutatePhase phase = lastContext.getCurrentPhase(); if (phase == BatchMutatePhase.FAILED) { done = false; break; @@ -948,14 +956,8 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re } if (!done) { // This batch needs to be retried since one of the previous concurrent batches has not completed yet. - // Throwing an IOException will result in retries of this batch. Before throwing exception, - // we need to remove reference counts and locks for the rows of this batch - removePendingRows(context); - context.indexUpdates.clear(); - for (RowLock rowLock : context.rowLocks) { - rowLock.release(); - } - context.rowLocks.clear(); + // Throwing an IOException will result in retries of this batch. Removal of reference counts and + // locks for the rows of this batch will be done in postBatchMutateIndispensably() throw new IOException("One of the previous concurrent mutations has not completed. " + "The batch needs to be retried " + table.getNameAsString()); } @@ -1048,6 +1050,15 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re } } + /** + * When this hook is called, all the rows in the batch context are locked if the batch of + * mutations is successful. Because the rows are locked, we can safely make updates to + * pending row states in memory and perform the necessary cleanup in that case. + * + * However, when the batch fails, then some of the rows may not be locked. In that case, + * we remove the pending row states from the concurrent hash map without updating them since + * pending rows states become invalid when a batch fails. + */ @Override public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { @@ -1064,10 +1075,10 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re } else { context.currentPhase = BatchMutatePhase.FAILED; } - if (context.waitList != null) { - for (CountDownLatch countDownLatch : context.waitList) { - countDownLatch.countDown(); - } + context.countDownAllLatches(); + removePendingRows(context); + if (context.indexUpdates != null) { + context.indexUpdates.clear(); } unlockRows(context); this.builder.batchCompleted(miniBatchOp); @@ -1124,6 +1135,16 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re } private void removePendingRows(BatchMutateContext context) { + if (context.currentPhase == BatchMutatePhase.FAILED) { + // This batch failed. All concurrent batches will fail too. So we can remove + // all rows of this batch from the memory as the in-memory row images are not valid + // anymore. Please note that when a batch fails, some of the rows may not have been + // locked and so it is not safe to update the pending row entries in that case. + for (ImmutableBytesPtr rowKey : context.rowsToLock) { + pendingRows.remove(rowKey); + } + return; + } for (RowLock rowLock : context.rowLocks) { ImmutableBytesPtr rowKey = rowLock.getRowKey(); PendingRow pendingRow = pendingRows.get(rowKey); @@ -1138,8 +1159,9 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - long start = EnvironmentEdgeManager.currentTimeMillis(); + long start = 0; try { + start = EnvironmentEdgeManager.currentTimeMillis(); if (failPreIndexUpdatesForTesting) { throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure"); } @@ -1151,14 +1173,12 @@ public class IndexRegionObserver extends CompatIndexRegionObserver implements Re metricSource.updatePreIndexUpdateFailureTime(dataTableName, EnvironmentEdgeManager.currentTimeMillis() - start); metricSource.incrementPreIndexUpdateFailures(dataTableName); - // Remove all locks as they are already unlocked. There is no need to unlock them again later when - // postBatchMutateIndispensably() is called - removePendingRows(context); - context.rowLocks.clear(); + // Re-acquire all locks since we released them before making index updates + // Removal of reference counts and locks for the rows of this batch will be + // done in postBatchMutateIndispensably() + lockRows(context); rethrowIndexingException(e); } - throw new RuntimeException( - "Somehow didn't complete the index update, but didn't return succesfully either!"); } /**