This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.14-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push: new 78169b6 PHOENIX-5539 Full row index write at the last write phase for mutable global indexes 78169b6 is described below commit 78169b62d0f4133bb2b69a538ef95a116c7e5e1f Author: Kadir <kozde...@salesforce.com> AuthorDate: Thu Oct 24 01:10:32 2019 -0700 PHOENIX-5539 Full row index write at the last write phase for mutable global indexes --- .../end2end/index/GlobalIndexCheckerIT.java | 50 ++++++++++++++++ .../phoenix/hbase/index/IndexRegionObserver.java | 68 ++++++++++++---------- 2 files changed, 87 insertions(+), 31 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index 63c6e75..7c823ea 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.end2end.index; +import static org.apache.phoenix.end2end.index.ImmutableIndexIT.verifyRowsForEmptyColValue; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -33,11 +34,19 @@ import java.util.Map; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT; import org.apache.phoenix.end2end.IndexToolIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; @@ -257,6 +266,47 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { } } + public static void checkUnverifiedCellCount(Connection conn, String indexTableName) throws Exception { + Table hIndexTable = conn.unwrap(PhoenixConnection.class).getQueryServices() + .getTable(Bytes.toBytes(indexTableName)); + long indexCnt = TestUtil.getRowCount(hIndexTable, false); + assertEquals(1, indexCnt); + assertEquals(true, verifyRowsForEmptyColValue(conn, indexTableName, IndexRegionObserver.UNVERIFIED_BYTES)); + Scan s = new Scan(); + int cntCellValues = 0; + try (ResultScanner scanner = hIndexTable.getScanner(s)) { + Result result; + while ((result = scanner.next()) != null) { + CellScanner cellScanner = result.cellScanner(); + while (cellScanner.advance()) { + cntCellValues++; + } + } + } + assertEquals(1, cntCellValues); + } + @Test + public void testUnverifiedRowIncludesOnlyEmptyCell() throws Exception { + String dataTableName = generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + conn.createStatement().execute("create table " + dataTableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions); + String indexTableName = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1) include (val2, val3)"); + // Configure IndexRegionObserver to fail the last write phase (i.e., the post index update phase) + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + conn.createStatement().execute("upsert into " + dataTableName + " (id, val2) values ('a', 'abcc')"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + // check that in the first phase we don't send the full row. + // We count the num of cells for this + checkUnverifiedCellCount(conn, indexTableName); + // Add rows and check everything is still okay + verifyTableHealth(conn, dataTableName, indexTableName); + } + } + @Test public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 993ff4b..a41e729 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -500,9 +500,25 @@ public class IndexRegionObserver extends BaseRegionObserver { return mutations; } + public void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) { + List<Cell> cellList = m.getFamilyCellMap().get(emptyCF); + if (cellList == null) { + return; + } + Iterator<Cell> cellIterator = cellList.iterator(); + while (cellIterator.hasNext()) { + Cell cell = cellIterator.next(); + if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + emptyCQ, 0, emptyCQ.length) == 0) { + cellIterator.remove(); + return; + } + } + } + private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context, - Collection<? extends Mutation> mutations) throws Throwable { + Collection<? extends Mutation> mutations, long now) throws Throwable { IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp); if (!(indexMetaData instanceof PhoenixIndexMetaData)) { throw new DoNotRetryIOException( @@ -511,7 +527,6 @@ public class IndexRegionObserver extends BaseRegionObserver { } List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers(); - List<Pair<Mutation, byte[]>> indexUpdatesForDeletes; // get the current span, or just use a null-span to avoid a bunch of if statements try (TraceScope scope = Trace.startSpan("Starting to build index updates")) { Span current = scope.getSpan(); @@ -528,7 +543,7 @@ public class IndexRegionObserver extends BaseRegionObserver { byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName(); Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator(); List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size()); - indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size()); + context.preIndexUpdates = new ArrayList<>(indexUpdates.size()); context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size()); while(indexUpdatesItr.hasNext()) { Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next(); @@ -549,35 +564,30 @@ public class IndexRegionObserver extends BaseRegionObserver { // add the VERIFIED cell, which is the empty cell Mutation m = next.getFirst().getFirst(); boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap()); - long ts = getMaxTimestamp(m); if (rebuild) { if (m instanceof Put) { + long ts = getMaxTimestamp(m); + // Remove the empty column prepared by Index codec as we need to change its value + removeEmptyColumn(m, emptyCF, emptyCQ); ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES); } } else { - if (m instanceof Put) { - ((Put)m).addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES); - // Ignore post index updates (i.e., the third write phase updates) for this row if it is - // going through concurrent updates - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond()); - if (!context.pendingRows.contains(rowKey)) { - Put put = new Put(m.getRow()); - put.addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES); - context.intermediatePostIndexUpdates.add(new Pair<>(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()), next.getSecond())); - } - } else { - // For a delete mutation, first unverify the existing row in the index table and then delete - // the row from the index table after deleting the corresponding row from the data table - indexUpdatesItr.remove(); - Put put = new Put(m.getRow()); - put.addColumn(emptyCF, emptyCQ, ts, UNVERIFIED_BYTES); - indexUpdatesForDeletes.add(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond())); - // Ignore post index updates (i.e., the third write phase updates) for this row if it is - // going through concurrent updates - ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond()); - if (!context.pendingRows.contains(rowKey)) { - context.intermediatePostIndexUpdates.add(next); + indexUpdatesItr.remove(); + // For this mutation whether it is put or delete, set the status of the index row "unverified" + // This will be done before the data table row is updated (i.e., in the first write phase) + Put unverifiedPut = new Put(m.getRow()); + unverifiedPut.addColumn(emptyCF, emptyCQ, now, UNVERIFIED_BYTES); + context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond())); + // Ignore post index updates (i.e., the third write phase updates) for this row if it is + // going through concurrent updates + ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond()); + if (!context.pendingRows.contains(rowKey)) { + if (m instanceof Put) { + // Remove the empty column prepared by Index codec as we need to change its value + removeEmptyColumn(m, emptyCF, emptyCQ); + ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES); } + context.intermediatePostIndexUpdates.add(next); } } } @@ -586,10 +596,6 @@ public class IndexRegionObserver extends BaseRegionObserver { miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()])); } - if (!indexUpdatesForDeletes.isEmpty()) { - context.preIndexUpdates = indexUpdatesForDeletes; - } - if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) { context.preIndexUpdates = new ArrayList<>(indexUpdates.size()); } @@ -628,7 +634,7 @@ public class IndexRegionObserver extends BaseRegionObserver { } long start = EnvironmentEdgeManager.currentTimeMillis(); - prepareIndexMutations(c, miniBatchOp, context, mutations); + prepareIndexMutations(c, miniBatchOp, context, mutations, now); metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start); // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to