This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch PHOENIX-7001-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7001-feature by this
push:
new 93d586efbd Add an extra delete mutation during rebuild for CDC index
93d586efbd is described below
commit 93d586efbddbaccc8663c2cca905aa363a74dcc3
Author: Kadir Ozdemir
AuthorDate: Mon Jan 22 22:10:32 2024 +0530
Add an extra delete mutation during rebuild for CDC index
---
.../phoenix/coprocessor/GlobalIndexRegionScanner.java | 13 +
.../org/apache/phoenix/hbase/index/IndexRegionObserver.java | 9 ++---
2 files changed, 19 insertions(+), 3 deletions(-)
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 0f07fe65d0..874b6669c1 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.filter.EmptyColumnOnlyFilter;
import org.apache.phoenix.filter.PagingFilter;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.filter.AllVersionsIndexRebuildFilter;
@@ -69,6 +70,7 @@ import org.apache.phoenix.schema.types.PVarbinary;
import
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ServerUtil;
@@ -1351,6 +1353,17 @@ public abstract class GlobalIndexRegionScanner extends
BaseRegionScanner {
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
indexMutations.add(del);
+if (indexMaintainer.isCDCIndex()) {
+// CDC Index needs two delete markers one for
deleting the index row,
+// and the other for referencing the data table
delete mutation with
+// the right index row key, that is, the index row
key starting with ts
+Put cdcDataRowState = new
Put(currentDataRowState.getRow());
+
cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(),
+
indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts,
+ByteUtil.EMPTY_BYTE_ARRAY);
+
indexMutations.add(IndexRegionObserver.getDeleteIndexMutation(
+currentDataRowState, indexMaintainer, ts,
rowKeyPtr));
+}
}
currentDataRowState = null;
indexRowKeyForCurrentDataRow = null;
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 702f4da82a..120b7b3882 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -899,10 +899,10 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
}
}
}
-private Mutation getDeleteIndexMutation(Put dataRowState, IndexMaintainer
indexMaintainer,
+public static Mutation getDeleteIndexMutation(Put dataRowState,
IndexMaintainer indexMaintainer,
long ts, ImmutableBytesPtr rowKeyPtr) {
-ValueGetter cdcDataRowVG = new
IndexUtil.SimpleValueGetter(dataRowState);
-byte[] indexRowKey = indexMaintainer.buildRowKey(cdcDataRowVG,
rowKeyPtr, null, null, ts);
+ValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(dataRowState);
+byte[] indexRowKey = indexMaintainer.buildRowKey(dataRowVG, rowKeyPtr,
null, null, ts);
return indexMaintainer.buildRowDeleteMutation(indexRowKey,
IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
}
@@ -971,6 +971,9 @@ public class IndexRegionObserver implements
RegionCoprocessor, RegionObserver {
new Pair(getDeleteIndexMutation(currentDataRowState,