This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 259b560548 PHOENIX-7328 Fix flapping 
ConcurrentMutationsExtendedIT#testConcurren… (#1903)
259b560548 is described below

commit 259b560548ed6f7a5e04909f2af9f39a3cf2ee58
Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Fri Jun 14 11:04:28 2024 +0300

    PHOENIX-7328 Fix flapping ConcurrentMutationsExtendedIT#testConcurren… 
(#1903)
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 200 ++++++++++++---------
 .../end2end/ConcurrentMutationsExtendedIT.java     |  16 +-
 2 files changed, 120 insertions(+), 96 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index a6f682da22..a4547b60c9 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.index.PhoenixIndexBuilderHelper;
+import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
 import 
org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
@@ -150,36 +151,51 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     private static final OperationStatus NOWRITE = new 
OperationStatus(OperationStatusCode.SUCCESS);
     public static final String PHOENIX_APPEND_METADATA_TO_WAL = 
"phoenix.append.metadata.to.wal";
     public static final boolean DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL = false;
+    /**
+     * Class to represent pending data table rows
+     * */
+    private class PendingRow {
+        private int count;
+        private boolean usable;
+        private ImmutableBytesPtr rowKey;
+        private BatchMutateContext lastContext;
+
+        PendingRow(ImmutableBytesPtr rowKey, BatchMutateContext context) {
+            count = 1;
+            usable = true;
+            lastContext = context;
+            this.rowKey = rowKey;
+        }
 
-  /**
-   * Class to represent pending data table rows
-   */
-  private static class PendingRow {
-      private int count;
-      private BatchMutateContext lastContext;
-
-      PendingRow(BatchMutateContext context) {
-          count = 1;
-          lastContext = context;
-      }
-
-      public void add(BatchMutateContext context) {
-          count++;
-          lastContext = context;
-      }
+        public boolean add(BatchMutateContext context) {
+            synchronized (this) {
+                if (usable) {
+                    count++;
+                    lastContext = context;
+                    return true;
+                }
+            }
+            return false;
+        }
 
-      public void remove() {
-          count--;
-      }
+        public void remove() {
+            synchronized (this) {
+                count--;
+                if (count == 0) {
+                    pendingRows.remove(rowKey);
+                    usable = false;
+                }
+            }
+        }
 
-      public int getCount() {
+        public int getCount() {
           return count;
       }
 
-      public BatchMutateContext getLastContext() {
+        public BatchMutateContext getLastContext() {
           return lastContext;
       }
-  }
+    }
 
   private static boolean ignoreIndexRebuildForTesting  = false;
   private static boolean failPreIndexUpdatesForTesting = false;
@@ -276,6 +292,9 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
 
       public CountDownLatch getCountDownLatch() {
           synchronized (this) {
+              if (currentPhase != BatchMutatePhase.PRE) {
+                  return null;
+              }
               if (waitList == null) {
                   waitList = new ArrayList<>();
               }
@@ -574,19 +593,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         context.rowLocks.clear();
     }
 
-  private void populatePendingRows(BatchMutateContext context) {
-      for (RowLock rowLock : context.rowLocks) {
-          ImmutableBytesPtr rowKey = rowLock.getRowKey();
-          PendingRow pendingRow = pendingRows.get(rowKey);
-          if (pendingRow == null) {
-              pendingRows.put(rowKey, new PendingRow(context));
-          } else {
-              // m is a mutation on a row that has already a pending mutation 
in progress from another batch
-              pendingRow.add(context);
-          }
-      }
-  }
-
     private Collection<? extends Mutation> 
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
                                                           BatchMutateContext 
context) throws IOException {
         context.multiMutationMap = new HashMap<>();
@@ -837,28 +843,49 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         return false;
     }
     /**
-     * Retrieve the last committed data row state.
+     * Retrieve the data row state either from memory or disk. The rows are 
locked by the caller.
      */
     private void 
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
                                      BatchMutateContext context) throws 
IOException {
         Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
         for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
-            PendingRow pendingRow = pendingRows.get(rowKeyPtr);
-            if (pendingRow != null && 
pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
-                if (context.lastConcurrentBatchContext == null) {
-                    context.lastConcurrentBatchContext = new HashMap<>();
-                }
-                context.lastConcurrentBatchContext.put(rowKeyPtr, 
pendingRow.getLastContext());
-                if (context.maxPendingRowCount < pendingRow.getCount()) {
-                    context.maxPendingRowCount = pendingRow.getCount();
-                }
-                Put put = 
pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
-                if (put != null) {
-                    context.dataRowStates.put(rowKeyPtr, new Pair<Put, 
Put>(put, new Put(put)));
-                }
-            }
-            else {
+            PendingRow pendingRow = new PendingRow(rowKeyPtr, context);
+            // Add the data table rows in the mini batch to the per region 
collection of pending
+            // rows. This will be used to detect concurrent updates
+            PendingRow existingPendingRow = pendingRows.putIfAbsent(rowKeyPtr, 
pendingRow);
+            if (existingPendingRow == null) {
+                // There was no pending row for this row key. We need to 
retrieve this row from disk
                 keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), 
SortOrder.ASC));
+            } else {
+                // There is a pending row for this row key. We need to 
retrieve the row from memory
+                BatchMutateContext lastContext = 
existingPendingRow.getLastContext();
+                if (existingPendingRow.add(context)) {
+                    BatchMutatePhase phase = lastContext.getCurrentPhase();
+                    Preconditions.checkArgument(phase != BatchMutatePhase.POST,
+                            "the phase of the last batch cannot be POST");
+                    if (phase == BatchMutatePhase.PRE) {
+                        if (context.lastConcurrentBatchContext == null) {
+                            context.lastConcurrentBatchContext = new 
HashMap<>();
+                        }
+                        context.lastConcurrentBatchContext.put(rowKeyPtr, 
lastContext);
+                        if (context.maxPendingRowCount < 
existingPendingRow.getCount()) {
+                            context.maxPendingRowCount = 
existingPendingRow.getCount();
+                        }
+                        Put put = lastContext.getNextDataRowState(rowKeyPtr);
+                        if (put != null) {
+                            context.dataRowStates.put(rowKeyPtr, new 
Pair<>(put, new Put(put)));
+                        }
+                    } else {
+                        // The last batch for this row key failed. We cannot 
use the memory state.
+                        // So we need to retrieve this row from disk
+                        
keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), SortOrder.ASC));
+                    }
+                } else {
+                    // The existing pending row is removed from the map. That 
means there is no
+                    // pending row for this row key anymore. We need to add 
the new one to the map
+                    pendingRows.put(rowKeyPtr, pendingRow);
+                    keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get(), 
SortOrder.ASC));
+                }
             }
         }
         if (keys.isEmpty()) {
@@ -1119,41 +1146,51 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
     }
 
+    /**
+     * Wait for the previous batches to complete. If any of the previous batch 
fails then this
+     * batch will fail too and needs to be retried. The rows are locked by the 
caller.
+     * @param table
+     * @param context
+     * @throws Throwable
+     */
     private void waitForPreviousConcurrentBatch(TableName table, 
BatchMutateContext context)
             throws Throwable {
-        boolean done = true;
         for (BatchMutateContext lastContext : 
context.lastConcurrentBatchContext.values()) {
             BatchMutatePhase phase = lastContext.getCurrentPhase();
-
-            if (phase == BatchMutatePhase.PRE) {
+            if (phase == BatchMutatePhase.FAILED) {
+                context.currentPhase = BatchMutatePhase.FAILED;
+                break;
+            } else if (phase == BatchMutatePhase.PRE) {
                 CountDownLatch countDownLatch = 
lastContext.getCountDownLatch();
+                if (countDownLatch == null) {
+                    // phase changed from PRE to either FAILED or POST
+                    if (phase == BatchMutatePhase.FAILED) {
+                        context.currentPhase = BatchMutatePhase.FAILED;
+                        break;
+                    }
+                    continue;
+                }
                 // Release the locks so that the previous concurrent mutation 
can go into the post phase
                 unlockRows(context);
                 // Wait for at most one concurrentMutationWaitDuration for 
each level in the dependency tree of batches.
                 // lastContext.getMaxPendingRowCount() is the depth of the 
subtree rooted at the batch pointed by lastContext
                 if (!countDownLatch.await((lastContext.getMaxPendingRowCount() 
+ 1) * concurrentMutationWaitDuration,
                         TimeUnit.MILLISECONDS)) {
+                    context.currentPhase = BatchMutatePhase.FAILED;
                     LOG.debug(String.format("latch timeout context %s last 
%s", context, lastContext));
-                    done = false;
+                    break;
                 }
-                // Acquire the locks again before letting the region proceed 
with data table updates
-                lockRows(context);
-                if (!done) {
-                    // previous concurrent batch did not complete so we have 
to retry this batch
+                if (lastContext.getCurrentPhase() == BatchMutatePhase.FAILED) {
+                    context.currentPhase = BatchMutatePhase.FAILED;
                     break;
-                } else {
-                    // read the phase again to determine the status of 
previous batch
-                    phase = lastContext.getCurrentPhase();
-                    LOG.debug(String.format("context %s last %s exit phase 
%s", context, lastContext, phase));
                 }
-            }
-
-            if (phase == BatchMutatePhase.FAILED) {
-                done = false;
-                break;
+                // Acquire the locks again before letting the region proceed 
with data table updates
+                lockRows(context);
+                LOG.debug(String.format("context %s last %s exit phase %s", 
context, lastContext,
+                        lastContext.getCurrentPhase()));
             }
         }
-        if (!done) {
+        if (context.currentPhase == BatchMutatePhase.FAILED) {
             // This batch needs to be retried since one of the previous 
concurrent batches has not completed yet.
             // Throwing an IOException will result in retries of this batch. 
Removal of reference counts and
             // locks for the rows of this batch will be done in 
postBatchMutateIndispensably()
@@ -1225,9 +1262,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         if (context.hasGlobalIndex || context.hasUncoveredIndex || 
context.hasTransform) {
             // Prepare next data rows states for pending mutations (for global 
indexes)
             prepareDataRowStates(c, miniBatchOp, context, now);
-            // Add the table rows in the mini batch to the collection of 
pending rows. This will be used to detect
-            // concurrent updates
-            populatePendingRows(context);
             // early exit if it turns out we don't have any edits
             long start = EnvironmentEdgeManager.currentTimeMillis();
             preparePreIndexMutations(context, now, indexMetaData);
@@ -1282,6 +1316,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                 RowLock rowLock = rowLockIterator.next();
                 ImmutableBytesPtr rowKey = rowLock.getRowKey();
                 if (row.equals(rowKey)) {
+                    PendingRow pendingRow = pendingRows.get(rowKey);
+                    if (pendingRow != null) {
+                        pendingRow.remove();
+                    }
                     rowLock.release();
                     rowLockIterator.remove();
                     context.rowsToLock.remove(row);
@@ -1379,7 +1417,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           doIndexWritesWithExceptions(context, true);
           metricSource.updatePostIndexUpdateTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
-          return;
       } catch (Throwable e) {
           metricSource.updatePostIndexUpdateFailureTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
@@ -1413,24 +1450,10 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   }
 
   private void removePendingRows(BatchMutateContext context) {
-      if (context.currentPhase == BatchMutatePhase.FAILED) {
-          // This batch failed. All concurrent batches will fail too. So we 
can remove
-          // all rows of this batch from the memory as the in-memory row 
images are not valid
-          // anymore. Please note that when a batch fails, some of the rows 
may not have been
-          // locked and so it is not safe to update the pending row entries in 
that case.
-          for (ImmutableBytesPtr rowKey : context.rowsToLock) {
-              pendingRows.remove(rowKey);
-          }
-          return;
-      }
-      for (RowLock rowLock : context.rowLocks) {
-          ImmutableBytesPtr rowKey = rowLock.getRowKey();
+      for (ImmutableBytesPtr rowKey : context.rowsToLock) {
           PendingRow pendingRow = pendingRows.get(rowKey);
           if (pendingRow != null) {
               pendingRow.remove();
-              if (pendingRow.getCount() == 0) {
-                  pendingRows.remove(rowKey);
-              }
           }
       }
   }
@@ -1445,7 +1468,6 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
           doIndexWritesWithExceptions(context, false);
           metricSource.updatePreIndexUpdateTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
-          return;
       } catch (Throwable e) {
           metricSource.updatePreIndexUpdateFailureTime(dataTableName,
               EnvironmentEdgeManager.currentTimeMillis() - start);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 95639d284a..278ccb05b5 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -87,14 +87,17 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     }
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
         
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, 
Long.toString(0));
         
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
             Integer.toString(MAX_LOOKBACK_AGE));
-        // The following sets the row lock wait duration to 10 ms to test the 
code path handling
+        // The following sets the row lock wait duration to 100 ms to test the 
code path handling
         // row lock timeouts. When there are concurrent mutations, the wait 
time can be
-        // much longer than 10 ms.
-        props.put("hbase.rowlock.wait.duration", "10");
+        // much longer than 100 ms
+        props.put("hbase.rowlock.wait.duration", "100");
+        // The following sets the wait duration for the previous concurrent 
batch to 10 ms to test
+        // the code path handling timeouts
+        props.put("phoenix.index.concurrent.wait.duration.ms", "10");
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
     @Parameterized.Parameters(
@@ -308,8 +311,8 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
     @Test
     public void testConcurrentUpserts() throws Exception {
         int nThreads = 10;
-        final int batchSize = 20;
-        final int nRows = 100;
+        final int batchSize = 100;
+        final int nRows = 499;
         final int nIndexValues = 23;
         final String tableName = generateUniqueName();
         final String indexName = generateUniqueName();
@@ -341,7 +344,6 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
                         conn.commit();
                     } catch (SQLException e) {
                         System.out.println(e);
-                        throw new RuntimeException(e);
                     } finally {
                         doneSignal.countDown();
                     }

Reply via email to