This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 5.1
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/5.1 by this push:
     new 49bfb36755 PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server 
crash (#1887)
49bfb36755 is described below

commit 49bfb36755ace643fbc6a42415a6fe64e3984719
Author: kadirozde <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Mon May 6 09:16:18 2024 -0700

    PHOENIX-7245 NPE in Phoenix Coproc leading to Region Server crash (#1887)
---
 .../end2end/ConcurrentMutationsExtendedIT.java     | 13 ++--
 .../phoenix/hbase/index/IndexRegionObserver.java   | 82 ++++++++++++++--------
 2 files changed, 60 insertions(+), 35 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 48768ead05..2d7eb3767a 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -76,10 +76,14 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(3);
         
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
         
props.put(CompatBaseScannerRegionObserver.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
             Integer.toString(MAX_LOOKBACK_AGE));
+        // The following sets the row lock wait duration to 10 ms to test the 
code path handling
+        // row lock timeouts. When there are concurrent mutations, the wait 
time can be
+        // much longer than 10 ms.
+        props.put("hbase.rowlock.wait.duration", "10");
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -280,9 +284,9 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testConcurrentUpserts() throws Exception {
-        int nThreads = 4;
-        final int batchSize = 200;
-        final int nRows = 51;
+        int nThreads = 10;
+        final int batchSize = 20;
+        final int nRows = 100;
         final int nIndexValues = 23;
         final String tableName = generateUniqueName();
         final String indexName = generateUniqueName();
@@ -312,6 +316,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
                         }
                         conn.commit();
                     } catch (SQLException e) {
+                        System.out.println(e);
                         throw new RuntimeException(e);
                     } finally {
                         doneSignal.countDown();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index f1f03d12af..d6666d191f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -189,7 +189,7 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
    */
 
   public static class BatchMutateContext {
-      private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+      private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
       // The max of reference counts on the pending rows of this batch at the 
time this batch arrives
       private int maxPendingRowCount = 0;
       private final int clientVersion;
@@ -246,12 +246,24 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
       }
 
       public CountDownLatch getCountDownLatch() {
-          if (waitList == null) {
-              waitList = new ArrayList<>();
+          synchronized (this) {
+              if (waitList == null) {
+                  waitList = new ArrayList<>();
+              }
+              CountDownLatch countDownLatch = new CountDownLatch(1);
+              waitList.add(countDownLatch);
+              return countDownLatch;
+          }
+      }
+
+      public void countDownAllLatches() {
+          synchronized (this) {
+              if (waitList != null) {
+                  for (CountDownLatch countDownLatch : waitList) {
+                      countDownLatch.countDown();
+                  }
+              }
           }
-          CountDownLatch countDownLatch = new CountDownLatch(1);
-          waitList.add(countDownLatch);
-          return countDownLatch;
       }
 
       public int getMaxPendingRowCount() {
@@ -898,8 +910,6 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
                 }
             }
         }
-        removePendingRows(context);
-        context.indexUpdates.clear();
     }
 
     private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) {
@@ -922,11 +932,9 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
 
     private void waitForPreviousConcurrentBatch(TableName table, 
BatchMutateContext context)
             throws Throwable {
-        boolean done;
-        BatchMutatePhase phase;
-        done = true;
+        boolean done = true;
         for (BatchMutateContext lastContext : 
context.lastConcurrentBatchContext.values()) {
-            phase = lastContext.getCurrentPhase();
+            BatchMutatePhase phase = lastContext.getCurrentPhase();
             if (phase == BatchMutatePhase.FAILED) {
                 done = false;
                 break;
@@ -948,14 +956,8 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
         }
         if (!done) {
             // This batch needs to be retried since one of the previous 
concurrent batches has not completed yet.
-            // Throwing an IOException will result in retries of this batch. 
Before throwing exception,
-            // we need to remove reference counts and locks for the rows of 
this batch
-            removePendingRows(context);
-            context.indexUpdates.clear();
-            for (RowLock rowLock : context.rowLocks) {
-                rowLock.release();
-            }
-            context.rowLocks.clear();
+            // Throwing an IOException will result in retries of this batch. 
Removal of reference counts and
+            // locks for the rows of this batch will be done in 
postBatchMutateIndispensably()
             throw new IOException("One of the previous concurrent mutations 
has not completed. " +
                     "The batch needs to be retried " + 
table.getNameAsString());
         }
@@ -1048,6 +1050,15 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
         }
     }
 
+    /**
+     * When this hook is called, all the rows in the batch context are locked 
if the batch of
+     * mutations is successful. Because the rows are locked, we can safely 
make updates to
+     * pending row states in memory and perform the necessary cleanup in that 
case.
+     *
+     * However, when the batch fails, then some of the rows may not be locked. 
In that case,
+     * we remove the pending row states from the concurrent hash map without 
updating them since
+     * pending rows states become invalid when a batch fails.
+     */
   @Override
   public void 
postBatchMutateIndispensably(ObserverContext<RegionCoprocessorEnvironment> c,
       MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean 
success) throws IOException {
@@ -1064,10 +1075,10 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
           } else {
               context.currentPhase = BatchMutatePhase.FAILED;
           }
-          if (context.waitList != null) {
-              for (CountDownLatch countDownLatch : context.waitList) {
-                  countDownLatch.countDown();
-              }
+          context.countDownAllLatches();
+          removePendingRows(context);
+          if (context.indexUpdates != null) {
+              context.indexUpdates.clear();
           }
           unlockRows(context);
           this.builder.batchCompleted(miniBatchOp);
@@ -1124,6 +1135,16 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
   }
 
   private void removePendingRows(BatchMutateContext context) {
+      if (context.currentPhase == BatchMutatePhase.FAILED) {
+          // This batch failed. All concurrent batches will fail too. So we 
can remove
+          // all rows of this batch from the memory as the in-memory row 
images are not valid
+          // anymore. Please note that when a batch fails, some of the rows 
may not have been
+          // locked and so it is not safe to update the pending row entries in 
that case.
+          for (ImmutableBytesPtr rowKey : context.rowsToLock) {
+              pendingRows.remove(rowKey);
+          }
+          return;
+      }
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
           PendingRow pendingRow = pendingRows.get(rowKey);
@@ -1138,8 +1159,9 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
 
   private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, 
BatchMutateContext context,
                      MiniBatchOperationInProgress<Mutation> miniBatchOp) 
throws IOException {
-      long start = EnvironmentEdgeManager.currentTimeMillis();
+      long start = 0;
       try {
+          start = EnvironmentEdgeManager.currentTimeMillis();
           if (failPreIndexUpdatesForTesting) {
               throw new DoNotRetryIOException("Simulating the first (i.e., 
pre) index table write failure");
           }
@@ -1151,14 +1173,12 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver implements Re
           metricSource.updatePreIndexUpdateFailureTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
           metricSource.incrementPreIndexUpdateFailures(dataTableName);
-          // Remove all locks as they are already unlocked. There is no need 
to unlock them again later when
-          // postBatchMutateIndispensably() is called
-          removePendingRows(context);
-          context.rowLocks.clear();
+          // Re-acquire all locks since we released them before making index 
updates
+          // Removal of reference counts and locks for the rows of this batch 
will be
+          // done in postBatchMutateIndispensably()
+          lockRows(context);
           rethrowIndexingException(e);
       }
-      throw new RuntimeException(
-              "Somehow didn't complete the index update, but didn't return 
succesfully either!");
   }
 
   /**

Reply via email to