This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push: new f75f03e7cf PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1886) f75f03e7cf is described below commit f75f03e7cfedf8f34c84d1ff8f1d5bbb1ac9c3ea Author: kadirozde <37155482+kadiro...@users.noreply.github.com> AuthorDate: Wed May 1 13:02:25 2024 -0700 PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1886) --- .../phoenix/hbase/index/IndexRegionObserver.java | 66 ++++++++++++++-------- .../end2end/ConcurrentMutationsExtendedIT.java | 11 +++- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 567f49ea35..18385766a2 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -210,7 +210,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { */ public static class BatchMutateContext { - private BatchMutatePhase currentPhase = BatchMutatePhase.PRE; + private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE; // The max of reference counts on the pending rows of this batch at the time this batch arrives private int maxPendingRowCount = 0; private final int clientVersion; @@ -273,12 +273,24 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } public CountDownLatch getCountDownLatch() { - if (waitList == null) { - waitList = new ArrayList<>(); + synchronized (this) { + if (waitList == null) { + waitList = new ArrayList<>(); + } + CountDownLatch countDownLatch = new CountDownLatch(1); + waitList.add(countDownLatch); + return countDownLatch; + } + } + + public void countDownAllLatches() { + synchronized (this) { + if (waitList != null) { + for (CountDownLatch countDownLatch : waitList) { + countDownLatch.countDown(); + } + } } - CountDownLatch countDownLatch = new CountDownLatch(1); - waitList.add(countDownLatch); - return countDownLatch; } public int getMaxPendingRowCount() { @@ -1067,11 +1079,9 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context) throws Throwable { - boolean done; - BatchMutatePhase phase; - done = true; + boolean done = true; for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) { - phase = lastContext.getCurrentPhase(); + BatchMutatePhase phase = lastContext.getCurrentPhase(); if (phase == BatchMutatePhase.PRE) { CountDownLatch countDownLatch = lastContext.getCountDownLatch(); @@ -1191,7 +1201,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { // Release the locks before making RPC calls for index updates unlockRows(context); // Do the first phase index updates - doPre(c, context, miniBatchOp); + doPre(context); // Acquire the locks again before letting the region proceed with data table updates lockRows(context); if (context.lastConcurrentBatchContext != null) { @@ -1277,9 +1287,13 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } /** - * When this hook is called, all the rows in the batch context are locked. Because the rows - * are locked, we can safely make updates to the context object and perform the necessary - * cleanup. + * When this hook is called, all the rows in the batch context are locked if the batch of + * mutations is successful. Because the rows are locked, we can safely make updates to + * pending row states in memory and perform the necessary cleanup in that case. + * + * However, when the batch fails, then some of the rows may not be locked. In that case, + * we remove the pending row states from the concurrent hash map without updating them since + * pending rows states become invalid when a batch fails. */ @Override public void postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c, @@ -1297,11 +1311,7 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } else { context.currentPhase = BatchMutatePhase.FAILED; } - if (context.waitList != null) { - for (CountDownLatch countDownLatch : context.waitList) { - countDownLatch.countDown(); - } - } + context.countDownAllLatches(); removePendingRows(context); if (context.indexUpdates != null) { context.indexUpdates.clear(); @@ -1361,6 +1371,16 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } private void removePendingRows(BatchMutateContext context) { + if (context.currentPhase == BatchMutatePhase.FAILED) { + // This batch failed. All concurrent batches will fail too. So we can remove + // all rows of this batch from the memory as the in-memory row images are not valid + // anymore. Please note that when a batch fails, some of the rows may not have been + // locked and so it is not safe to update the pending row entries in that case. + for (ImmutableBytesPtr rowKey : context.rowsToLock) { + pendingRows.remove(rowKey); + } + return; + } for (RowLock rowLock : context.rowLocks) { ImmutableBytesPtr rowKey = rowLock.getRowKey(); PendingRow pendingRow = pendingRows.get(rowKey); @@ -1373,10 +1393,10 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { } } - private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context, - MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - long start = EnvironmentEdgeManager.currentTimeMillis(); + private void doPre(BatchMutateContext context) throws IOException { + long start = 0; try { + start = EnvironmentEdgeManager.currentTimeMillis(); if (failPreIndexUpdatesForTesting) { throw new DoNotRetryIOException("Simulating the first (i.e., pre) index table write failure"); } @@ -1394,8 +1414,6 @@ public class IndexRegionObserver implements RegionCoprocessor, RegionObserver { lockRows(context); rethrowIndexingException(e); } - throw new RuntimeException( - "Somehow didn't complete the index update, but didn't return succesfully either!"); } private void extractExpressionsAndColumns(DataInputStream input, diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index f02018f5b9..4bf5ffacc1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -89,6 +89,10 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0)); props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY, Integer.toString(MAX_LOOKBACK_AGE)); + // The following sets the row lock wait duration to 10 ms to test the code path handling + // row lock timeouts. When there are concurrent mutations, the wait time can be + // much longer than 10 ms. + props.put("hbase.rowlock.wait.duration", "10"); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @Parameterized.Parameters( @@ -300,9 +304,9 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { @Test public void testConcurrentUpserts() throws Exception { - int nThreads = 4; - final int batchSize = 200; - final int nRows = 51; + int nThreads = 10; + final int batchSize = 20; + final int nRows = 100; final int nIndexValues = 23; final String tableName = generateUniqueName(); final String indexName = generateUniqueName(); @@ -333,6 +337,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT { } conn.commit(); } catch (SQLException e) { + System.out.println(e); throw new RuntimeException(e); } finally { doneSignal.countDown();