This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 19119f9229 PHOENIX-7591 Concurrent updates to tables with indexes can
cause data inconsistency
19119f9229 is described below
commit 19119f92295529860590eeaa24099766e48ba30c
Author: tkhurana <[email protected]>
AuthorDate: Thu May 22 10:55:14 2025 -0700
PHOENIX-7591 Concurrent updates to tables with indexes can cause data
inconsistency
---
.../hbase/index/util/IndexManagementUtil.java | 2 +-
.../phoenix/hbase/index/IndexRegionObserver.java | 13 ++-
.../org/apache/phoenix/end2end/IndexToolIT.java | 128 +++++++++++----------
3 files changed, 81 insertions(+), 62 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
index 3d8c111d8e..3b03e7f51b 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -199,7 +199,7 @@ public class IndexManagementUtil {
try {
throw e;
} catch (IOException | FatalIndexBuildingFailureException e1) {
- LOGGER.info("Rethrowing " + e);
+ LOGGER.info("Rethrowing ", e);
throw e1;
}
catch (Throwable e1) {
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index fee328cbfa..fed0bb274d 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -230,7 +230,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
ignoreWritingDeleteColumnsToIndex = ignore;
}
public enum BatchMutatePhase {
- PRE, POST, FAILED
+ INIT, PRE, POST, FAILED
}
// Hack to get around not being able to save any state between
@@ -245,7 +245,7 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
* locks to serialize the access to the BatchMutateContext objects.
*/
public static class BatchMutateContext {
- private volatile BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+ private volatile BatchMutatePhase currentPhase = BatchMutatePhase.INIT;
// The max of reference counts on the pending rows of this batch at
the time this
// batch arrives.
private int maxPendingRowCount = 0;
@@ -1468,6 +1468,8 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return;
}
lockRows(context);
+ // acquired the locks, move to the next phase PRE
+ context.currentPhase = BatchMutatePhase.PRE;
long onDupCheckTime = 0;
if (context.hasAtomic || context.returnResult || context.hasGlobalIndex
@@ -1637,6 +1639,12 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
return;
}
try {
+ // We add to pending rows only after we have locked all the rows in
the batch
+ // If we are in the INIT phase that means we failed to acquire the
locks before the
+ // PRE phase
+ if (context.getCurrentPhase() != BatchMutatePhase.INIT) {
+ removePendingRows(context);
+ }
if (success) {
context.currentPhase = BatchMutatePhase.POST;
if ((context.hasAtomic || context.returnResult) &&
miniBatchOp.size() == 1) {
@@ -1659,7 +1667,6 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
context.currentPhase = BatchMutatePhase.FAILED;
}
context.countDownAllLatches();
- removePendingRows(context);
if (context.indexUpdates != null) {
context.indexUpdates.clear();
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index df82cdee28..cef5d90433 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -1053,64 +1053,76 @@ public class IndexToolIT extends BaseTest {
String schemaName =
SchemaUtil.getSchemaNameFromFullName(fullTableName);
String tableName = SchemaUtil.getTableNameFromFullName(fullTableName);
String indexName = SchemaUtil.getTableNameFromFullName(fullIndexName);
- // This checks the state of every raw index row without rebuilding any
row
- IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName,
tableName,
- indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
- LOGGER.info(indexTool.getJob().getCounters().toString());
- TestUtil.dumpTable(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
-
- // This checks the state of an index row after it is repaired
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn,
fullTableName, fullIndexName);
- // We want to check the index rows again as they may be modified by
the read repair
- indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName,
indexName,
- null, 0, IndexTool.IndexVerifyType.ONLY);
- LOGGER.info(indexTool.getJob().getCounters().toString());
-
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- // The index scrutiny run will trigger index repair on all unverified
rows, and they will be repaired or
- // deleted (since the age threshold is set to zero ms for these tests
- PTable pIndexTable =
conn.unwrap(PhoenixConnection.class).getTable(fullIndexName);
- if (pIndexTable.getIndexType() != PTable.IndexType.UNCOVERED_GLOBAL) {
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
- }
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
- // Now we rebuild the entire index table and expect that it is still
good after the full rebuild
- indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName,
indexName,
- null, 0, IndexTool.IndexVerifyType.AFTER);
-
assertEquals(indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue(),
-
indexTool.getJob().getCounters().findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
- // Truncate, rebuild and verify the index table
- TableName physicalTableName =
TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
- PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
- try (Admin admin = pConn.getQueryServices().getAdmin()) {
- admin.disableTable(physicalTableName);
- admin.truncateTable(physicalTableName, true);
+ try {
+ // This checks the state of every raw index row without rebuilding
any row
+ IndexTool indexTool = IndexToolIT.runIndexTool(false, schemaName,
tableName,
+ indexName, null, 0, IndexTool.IndexVerifyType.ONLY);
+ TestUtil.dumpTable(conn,
TableName.valueOf(IndexVerificationOutputRepository.OUTPUT_TABLE_NAME));
+ Counters counters = indexTool.getJob().getCounters();
+ LOGGER.info(counters.toString());
+ assertEquals(0,
counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+ // This checks the state of an index row after it is repaired
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn,
fullTableName, fullIndexName);
+ // We want to check the index rows again as they may be modified
by the read repair
+ indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName,
indexName,
+ null, 0, IndexTool.IndexVerifyType.ONLY);
+ counters = indexTool.getJob().getCounters();
+ LOGGER.info(counters.toString());
+ assertEquals(0,
counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ // The index scrutiny run will trigger index repair on all
unverified rows, and they will be repaired or
+ // deleted (since the age threshold is set to zero ms for these
tests
+ PTable pIndexTable =
conn.unwrap(PhoenixConnection.class).getTable(fullIndexName);
+ if (pIndexTable.getIndexType() !=
PTable.IndexType.UNCOVERED_GLOBAL) {
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT).getValue());
+ }
+ assertEquals(0,
counters.findCounter(BEFORE_REBUILD_OLD_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
indexTool.getJob().getCounters().findCounter(BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT).getValue());
+
+ // Now we rebuild the entire index table and expect that it is
still good after the full rebuild
+ indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName,
indexName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ counters = indexTool.getJob().getCounters();
+ LOGGER.info(counters.toString());
+
assertEquals(counters.findCounter(AFTER_REBUILD_VALID_INDEX_ROW_COUNT).getValue(),
+ counters.findCounter(REBUILT_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+ // Truncate, rebuild and verify the index table
+ TableName physicalTableName =
TableName.valueOf(pIndexTable.getPhysicalName().getBytes());
+ PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+ try (Admin admin = pConn.getQueryServices().getAdmin()) {
+ admin.disableTable(physicalTableName);
+ admin.truncateTable(physicalTableName, true);
+ }
+ indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName,
indexName,
+ null, 0, IndexTool.IndexVerifyType.AFTER);
+ counters = indexTool.getJob().getCounters();
+ LOGGER.info(counters.toString());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
+ assertEquals(0,
counters.findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
+
pConn.getQueryServices().clearTableRegionCache(TableName.valueOf(fullIndexName));
+ long actualRowCountAfterCompaction =
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
+ assertEquals(actualRowCount, actualRowCountAfterCompaction);
+ return actualRowCount;
+ } catch (AssertionError e) {
+ TestUtil.dumpTable(conn, TableName.valueOf(fullTableName));
+ TestUtil.dumpTable(conn, TableName.valueOf(fullIndexName));
+ throw e;
}
- indexTool = IndexToolIT.runIndexTool(false, schemaName, tableName,
indexName,
- null, 0, IndexTool.IndexVerifyType.AFTER);
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_INVALID_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT).getValue());
- assertEquals(0,
indexTool.getJob().getCounters().findCounter(AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT).getValue());
-
pConn.getQueryServices().clearTableRegionCache(TableName.valueOf(fullIndexName));
- long actualRowCountAfterCompaction =
IndexScrutiny.scrutinizeIndex(conn, fullTableName, fullIndexName);
- assertEquals(actualRowCount, actualRowCountAfterCompaction);
- return actualRowCount;
}
}