This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 501ae7ee3c PHOENIX-7379 Improve handling of concurrent index mutations 
with the … (#1951)
501ae7ee3c is described below

commit 501ae7ee3c66e4c6dc08374938e6595e355197dc
Author: Kadir Ozdemir <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Wed Aug 14 23:23:37 2024 -0700

    PHOENIX-7379 Improve handling of concurrent index mutations with the … 
(#1951)
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 87 ++++++++++++++++------
 .../end2end/ConcurrentMutationsExtendedIT.java     | 19 +++--
 2 files changed, 77 insertions(+), 29 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 317841fb13..b2e3b5ff9d 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -58,7 +58,6 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -366,7 +365,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
   private boolean shouldWALAppend = DEFAULT_PHOENIX_APPEND_METADATA_TO_WAL;
   private boolean isNamespaceEnabled = false;
   private boolean useBloomFilter = false;
-
+  private long lastTimestamp = 0;
+  private List<Set<ImmutableBytesPtr>> batchesWithLastTimestamp = new 
ArrayList<>();
   private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
   private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 
100;
 
@@ -1044,7 +1044,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
      * either set to "verified" or the row is deleted.
      */
     private void preparePreIndexMutations(BatchMutateContext context,
-                                          long now,
+                                          long batchTimestamp,
                                           PhoenixIndexMetaData indexMetaData) 
throws Throwable {
         List<IndexMaintainer> maintainers = 
indexMetaData.getIndexMaintainers();
         // get the current span, or just use a null-span to avoid a bunch of 
if statements
@@ -1056,7 +1056,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             current.addTimelineAnnotation("Built index updates, doing 
preStep");
             // The rest of this method is for handling global index updates
             context.indexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
-            prepareIndexMutations(context, maintainers, now);
+            prepareIndexMutations(context, maintainers, batchTimestamp);
 
             context.preIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
             int updateCount = 0;
@@ -1076,7 +1076,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                         // Set the status of the index row to "unverified"
                         Put unverifiedPut = new Put(m.getRow());
                         unverifiedPut.addColumn(
-                            emptyCF, emptyCQ, now, 
QueryConstants.UNVERIFIED_BYTES);
+                            emptyCF, emptyCQ, batchTimestamp, 
QueryConstants.UNVERIFIED_BYTES);
                         // This will be done before the data table row is 
updated (i.e., in the first write phase)
                         context.preIndexUpdates.put(hTableInterfaceReference, 
unverifiedPut);
                     }
@@ -1100,7 +1100,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
     }
 
     private void preparePostIndexMutations(BatchMutateContext context,
-                                           long now,
+                                           long batchTimestamp,
                                            PhoenixIndexMetaData indexMetaData) 
{
         context.postIndexUpdates = 
ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
         List<IndexMaintainer> maintainers = 
indexMetaData.getIndexMaintainers();
@@ -1116,7 +1116,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
                     if (!indexMaintainer.isUncovered()) {
                         Put verifiedPut = new Put(m.getRow());
                         // Set the status of the index row to "verified"
-                        verifiedPut.addColumn(emptyCF, emptyCQ, now, 
QueryConstants.VERIFIED_BYTES);
+                        verifiedPut.addColumn(emptyCF, emptyCQ, batchTimestamp,
+                                QueryConstants.VERIFIED_BYTES);
                         context.postIndexUpdates.put(hTableInterfaceReference, 
verifiedPut);
                     }
                 } else {
@@ -1211,6 +1212,53 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
         }
     }
 
+    private boolean shouldSleep(BatchMutateContext context) {
+        for (ImmutableBytesPtr ptr : context.rowsToLock) {
+            for (Set set : batchesWithLastTimestamp) {
+                if (set.contains(ptr)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+    private long getBatchTimestamp(BatchMutateContext context, TableName table)
+            throws InterruptedException {
+        synchronized (this) {
+            long ts = EnvironmentEdgeManager.currentTimeMillis();
+            if (ts != lastTimestamp) {
+                // The timestamp for this batch will be different from the 
last batch processed.
+                lastTimestamp = ts;
+                batchesWithLastTimestamp.clear();
+                batchesWithLastTimestamp.add(context.rowsToLock);
+                return ts;
+            } else {
+                if (!shouldSleep(context)) {
+                    // There is no need to sleep as the last batches with the 
same timestamp
+                    // do not have a common row this batch
+                    batchesWithLastTimestamp.add(context.rowsToLock);
+                    return ts;
+                }
+            }
+        }
+        // Sleep for one millisecond. The sleep is necessary to get different 
timestamps
+        // for concurrent batches that share common rows.
+        Thread.sleep(1);
+        LOG.debug("slept 1ms for " + table.getNameAsString());
+        synchronized (this) {
+            long ts = EnvironmentEdgeManager.currentTimeMillis();
+            if (ts != lastTimestamp) {
+                // The timestamp for this batch will be different from the 
last batch processed.
+                lastTimestamp = ts;
+                batchesWithLastTimestamp.clear();
+            }
+            // We do not have to check again if we need to sleep again since 
we got the next
+            // timestamp while holding the row locks. This mean there cannot 
be a new
+            // mutation with the same row attempting get the same timestamp
+            batchesWithLastTimestamp.add(context.rowsToLock);
+            return ts;
+        }
+    }
     public void 
preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
                                              
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, 
miniBatchOp);
@@ -1226,7 +1274,8 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             ServerIndexUtil.setDeleteAttributes(miniBatchOp);
         }
 
-        // Exclusively lock all rows to do consistent writes over multiple 
tables (i.e., the data and its index tables)
+        // Exclusively lock all rows to do consistent writes over multiple 
tables
+        // (i.e., the data and its index tables)
         populateRowsToLock(miniBatchOp, context);
         // early exit if it turns out we don't have any update for indexes
         if (context.rowsToLock.isEmpty()) {
@@ -1265,27 +1314,19 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             }
         }
 
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        // Update the timestamps of the data table mutations to prevent 
overlapping timestamps (which prevents index
-        // inconsistencies as this case isn't handled correctly currently).
-        setTimestamps(miniBatchOp, builder, now);
-
         TableName table = 
c.getEnvironment().getRegion().getRegionInfo().getTable();
+        long batchTimestamp = getBatchTimestamp(context, table);
+        // Update the timestamps of the data table mutations to prevent 
overlapping timestamps
+        // (which prevents index inconsistencies as this case is not handled).
+        setTimestamps(miniBatchOp, builder, batchTimestamp);
         if (context.hasGlobalIndex || context.hasUncoveredIndex || 
context.hasTransform) {
             // Prepare next data rows states for pending mutations (for global 
indexes)
-            prepareDataRowStates(c, miniBatchOp, context, now);
+            prepareDataRowStates(c, miniBatchOp, context, batchTimestamp);
             // early exit if it turns out we don't have any edits
             long start = EnvironmentEdgeManager.currentTimeMillis();
-            preparePreIndexMutations(context, now, indexMetaData);
+            preparePreIndexMutations(context, batchTimestamp, indexMetaData);
             metricSource.updateIndexPrepareTime(dataTableName,
                 EnvironmentEdgeManager.currentTimeMillis() - start);
-            // Sleep for one millisecond if we have prepared the index updates 
in less than 1 ms. The sleep is necessary to
-            // get different timestamps for concurrent batches that share 
common rows. It is very rare that the index updates
-            // can be prepared in less than one millisecond
-            if (!context.rowLocks.isEmpty() && now == 
EnvironmentEdgeManager.currentTimeMillis()) {
-                Thread.sleep(1);
-                LOG.debug("slept 1ms for " + table.getNameAsString());
-            }
             // Release the locks before making RPC calls for index updates
             unlockRows(context);
             // Do the first phase index updates
@@ -1295,7 +1336,7 @@ public class IndexRegionObserver implements 
RegionCoprocessor, RegionObserver {
             if (context.lastConcurrentBatchContext != null) {
                 waitForPreviousConcurrentBatch(table, context);
             }
-            preparePostIndexMutations(context, now, indexMetaData);
+            preparePostIndexMutations(context, batchTimestamp, indexMetaData);
         }
         if (context.hasLocalIndex) {
             // Group all the updates for a single row into a single update to 
be processed (for local indexes)
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
index 278ccb05b5..12dd00fd15 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.end2end;
 
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.mapreduce.index.IndexVerificationOutputRepository;
-import org.apache.phoenix.mapreduce.index.IndexVerificationResultRepository;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseIOException;
@@ -42,6 +41,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -68,12 +69,15 @@ import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEF
 import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT;
 import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT;
 import static 
org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 @Category(NeedsOwnMiniClusterTest.class)
 @RunWith(Parameterized.class)
 public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
-
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(ConcurrentMutationsExtendedIT.class);
     private final boolean uncovered;
     private static final Random RAND = new Random(5);
     private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
@@ -111,7 +115,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         // This checks the state of every raw index row without rebuilding any 
row
         IndexTool indexTool = IndexToolIT.runIndexTool(false, "", tableName,
                 indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
-        System.out.println(indexTool.getJob().getCounters());
+        LOGGER.info(indexTool.getJob().getCounters().toString());
         TestUtil.dumpTable(conn, 
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
@@ -126,7 +130,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         // We want to check the index rows again as they may be modified by 
the read repair
         indexTool = IndexToolIT.runIndexTool(false, "", tableName, indexName,
                 null, 0, IndexTool.IndexVerifyType.ONLY);
-        System.out.println(indexTool.getJob().getCounters());
+        LOGGER.info(indexTool.getJob().getCounters().toString());
 
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
         assertEquals(0, 
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
@@ -324,6 +328,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
                 + tableName + "(v1)" + (uncovered ? "" :  "INCLUDE(v2, v3)"));
         final CountDownLatch doneSignal = new CountDownLatch(nThreads);
         Runnable[] runnables = new Runnable[nThreads];
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         for (int i = 0; i < nThreads; i++) {
             runnables[i] = new Runnable() {
 
@@ -343,7 +348,7 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
                         }
                         conn.commit();
                     } catch (SQLException e) {
-                        System.out.println(e);
+                        LOGGER.warn("Exception during upsert : " + e);
                     } finally {
                         doneSignal.countDown();
                     }
@@ -357,6 +362,8 @@ public class ConcurrentMutationsExtendedIT extends 
ParallelStatsDisabledIT {
         }
 
         assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+        LOGGER.info("Total upsert time in ms : "
+                + (EnvironmentEdgeManager.currentTimeMillis() - startTime));
         long actualRowCount = verifyIndexTable(tableName, indexName, conn);
         assertEquals(nRows, actualRowCount);
     }

Reply via email to