This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.2 by this push:
     new f75f03e7cf PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server 
crash (#1886)
f75f03e7cf is described below

commit f75f03e7cfedf8f34c84d1ff8f1d5bbb1ac9c3ea
Author: kadirozde <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Wed May 1 13:02:25 2024 -0700

    PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1886)
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 66 ++++++++++++++--------
 .../end2end/ConcurrentMutationsExtendedIT.java     | 11 +++-
 2 files changed, 50 insertions(+), 27 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 567f49ea35..18385766a2 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -210,7 +210,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
    */
 
   public static class BatchMutateContext {
-      private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+      private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
       // The max of reference counts on the pending rows of this batch at the 
time this batch arrives
       private int maxPendingRowCount = 0;
       private final int clientVersion;
@@ -273,12 +273,24 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       }
 
       public CountDownLatch getCountDownLatch() {
-          if (waitList == null) {
-              waitList = new ArrayList<>();
+          synchronized (this) {
+              if (waitList == null) {
+                  waitList = new ArrayList<>();
+              }
+              CountDownLatch countDownLatch = new CountDownLatch(1);
+              waitList.add(countDownLatch);
+              return countDownLatch;
+          }
+      }
+
+      public void countDownAllLatches() {
+          synchronized (this) {
+              if (waitList != null) {
+                  for (CountDownLatch countDownLatch : waitList) {
+                      countDownLatch.countDown();
+                  }
+              }
           }
-          CountDownLatch countDownLatch = new CountDownLatch(1);
-          waitList.add(countDownLatch);
-          return countDownLatch;
       }
 
       public int getMaxPendingRowCount() {
@@ -1067,11 +1079,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
 
     private void waitForPreviousConcurrentBatch(TableName table, 
BatchMutateContext context)
             throws Throwable {
-        boolean done;
-        BatchMutatePhase phase;
-        done = true;
+        boolean done = true;
         for (BatchMutateContext lastContext : 
context.lastConcurrentBatchContext.values()) {
-            phase = lastContext.getCurrentPhase();
+            BatchMutatePhase phase = lastContext.getCurrentPhase();
 
             if (phase == BatchMutatePhase.PRE) {
                 CountDownLatch countDownLatch = 
lastContext.getCountDownLatch();
@@ -1191,7 +1201,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             // Release the locks before making RPC calls for index updates
             unlockRows(context);
             // Do the first phase index updates
-            doPre(c, context, miniBatchOp);
+            doPre(context);
             // Acquire the locks again before letting the region proceed with 
data table updates
             lockRows(context);
             if (context.lastConcurrentBatchContext != null) {
@@ -1277,9 +1287,13 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
 
     /**
-     * When this hook is called, all the rows in the batch context are locked. 
Because the rows
-     * are locked, we can safely make updates to the context object and 
perform the necessary
-     * cleanup.
+     * When this hook is called, all the rows in the batch context are locked 
if the batch of
+     * mutations is successful. Because the rows are locked, we can safely 
make updates to
+     * pending row states in memory and perform the necessary cleanup in that 
case.
+     *
+     * However, when the batch fails, then some of the rows may not be locked. 
In that case,
+     * we remove the pending row states from the concurrent hash map without 
updating them since
+     * pending rows states become invalid when a batch fails.
      */
   @Override
   public void 
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
@@ -1297,11 +1311,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           } else {
               context.currentPhase = BatchMutatePhase.FAILED;
           }
-          if (context.waitList != null) {
-              for (CountDownLatch countDownLatch : context.waitList) {
-                  countDownLatch.countDown();
-              }
-          }
+          context.countDownAllLatches();
           removePendingRows(context);
           if (context.indexUpdates != null) {
               context.indexUpdates.clear();
@@ -1361,6 +1371,16 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   }
 
   private void removePendingRows(BatchMutateContext context) {
+      if (context.currentPhase == BatchMutatePhase.FAILED) {
+          // This batch failed. All concurrent batches will fail too. So we 
can remove
+          // all rows of this batch from the memory as the in-memory row 
images are not valid
+          // anymore. Please note that when a batch fails, some of the rows 
may not have been
+          // locked and so it is not safe to update the pending row entries in 
that case.
+          for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+              pendingRows.remove(rowKey);
+          }
+          return;
+      }
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
           PendingRow pendingRow = pendingRows.get(rowKey);
@@ -1373,10 +1393,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
       }
   }
 
-  private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context,
-                     MiniBatchOperationInProgress<Mutation> miniBatchOp) 
throws IOException {
-      long start = EnvironmentEdgeManager.currentTimeMillis();
+  private void doPre(BatchMutateContext context) throws IOException {
+      long start = 0;
       try {
+          start = EnvironmentEdgeManager.currentTimeMillis();
           if (failPreIndexUpdatesForTesting) {
               throw new DoNotRetryIOException("Simulating the first (i.e., 
pre) index table write failure");
           }
@@ -1394,8 +1414,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           lockRows(context);
           rethrowIndexingException(e);
       }
-      throw new RuntimeException(
-              "Somehow didn't complete the index update, but didn't return 
succesfully either!");
   }
 
   private void extractExpressionsAndColumns(DataInputStream input,
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index f02018f5b9..4bf5ffacc1 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -89,6 +89,10 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
         
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
             Integer.toString(MAX_LOOKBACK_AGE));
+        // The following sets the row lock wait duration to 10 ms to test the 
code path handling
+        // row lock timeouts. When there are concurrent mutations, the wait 
time can be
+        // much longer than 10 ms.
+        props.put("hbase.rowlock.wait.duration", "10");
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     @Parameterized.Parameters(
@@ -300,9 +304,9 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testConcurrentUpserts() throws Exception {
-        int nThreads = 4;
-        final int batchSize = 200;
-        final int nRows = 51;
+        int nThreads = 10;
+        final int batchSize = 20;
+        final int nRows = 100;
         final int nIndexValues = 23;
         final String tableName = generateUniqueName();
         final String indexName = generateUniqueName();
@@ -333,6 +337,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
                         }
                         conn.commit();
                     } catch (SQLException e) {
+                        System.out.println(e);
                         throw new RuntimeException(e);
                     } finally {
                         doneSignal.countDown();

Reply via email to