This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
     new a3242f9  PHOENIX-5539 Full row index write at the last write phase for 
mutable global indexes
a3242f9 is described below

commit a3242f913cd996ae47b8d9fa000d197d2e6d2e5c
Author: Kadir <kozde...@salesforce.com>
AuthorDate: Thu Oct 24 01:10:32 2019 -0700

    PHOENIX-5539 Full row index write at the last write phase for mutable 
global indexes
---
 .../end2end/index/GlobalIndexCheckerIT.java        | 50 ++++++++++++++++
 .../phoenix/hbase/index/IndexRegionObserver.java   | 68 ++++++++++++----------
 2 files changed, 87 insertions(+), 31 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index 63c6e75..7c823ea 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.end2end.index;
 
+import static 
org.apache.phoenix.end2end.index.ImmutableIndexIT.verifyRowsForEmptyColValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -33,11 +34,19 @@ import java.util.Map;
 import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseUniqueNamesOwnClusterIT;
 import org.apache.phoenix.end2end.IndexToolIT;
 import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -257,6 +266,47 @@ public class GlobalIndexCheckerIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    public static void checkUnverifiedCellCount(Connection conn, String 
indexTableName) throws Exception {
+        Table hIndexTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices()
+                .getTable(Bytes.toBytes(indexTableName));
+        long indexCnt = TestUtil.getRowCount(hIndexTable, false);
+        assertEquals(1, indexCnt);
+        assertEquals(true, verifyRowsForEmptyColValue(conn, indexTableName, 
IndexRegionObserver.UNVERIFIED_BYTES));
+        Scan s = new Scan();
+        int cntCellValues = 0;
+        try (ResultScanner scanner = hIndexTable.getScanner(s)) {
+            Result result;
+            while ((result = scanner.next()) != null) {
+                CellScanner cellScanner = result.cellScanner();
+                while (cellScanner.advance()) {
+                    cntCellValues++;
+                }
+            }
+        }
+        assertEquals(1, cntCellValues);
+    }
+    @Test
+    public void testUnverifiedRowIncludesOnlyEmptyCell() throws Exception {
+        String dataTableName = generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar(10) not null primary key, val1 varchar(10), 
val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                    dataTableName + " (val1) include (val2, val3)");
+            // Configure IndexRegionObserver to fail the last write phase 
(i.e., the post index update phase)
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val2) values ('a', 'abcc')");
+            conn.commit();
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            // check that in the first phase we don't send the full row.
+            // We count the num of cells for this
+            checkUnverifiedCellCount(conn, indexTableName);
+            // Add rows and check everything is still okay
+            verifyTableHealth(conn, dataTableName, indexTableName);
+        }
+    }
+
     @Test
     public void testOnePhaseOverwiteFollowingTwoPhaseWrite() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 993ff4b..a41e729 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -500,9 +500,25 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
       return mutations;
   }
 
+  public void removeEmptyColumn(Mutation m, byte[] emptyCF, byte[] emptyCQ) {
+      List<Cell> cellList = m.getFamilyCellMap().get(emptyCF);
+      if (cellList == null) {
+          return;
+      }
+      Iterator<Cell> cellIterator = cellList.iterator();
+      while (cellIterator.hasNext()) {
+          Cell cell = cellIterator.next();
+          if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
+                  emptyCQ, 0, emptyCQ.length) == 0) {
+              cellIterator.remove();
+              return;
+          }
+      }
+  }
+
   private void 
prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
                                      MiniBatchOperationInProgress<Mutation> 
miniBatchOp, BatchMutateContext context,
-                                     Collection<? extends Mutation> mutations) 
throws Throwable {
+                                     Collection<? extends Mutation> mutations, 
long now) throws Throwable {
       IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
       if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
           throw new DoNotRetryIOException(
@@ -511,7 +527,6 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
       }
       List<IndexMaintainer> maintainers = 
((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
 
-      List<Pair<Mutation, byte[]>> indexUpdatesForDeletes;
       // get the current span, or just use a null-span to avoid a bunch of if 
statements
       try (TraceScope scope = Trace.startSpan("Starting to build index 
updates")) {
           Span current = scope.getSpan();
@@ -528,7 +543,7 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
           byte[] tableName = 
c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
           Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = 
indexUpdates.iterator();
           List<Mutation> localUpdates = new 
ArrayList<Mutation>(indexUpdates.size());
-          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
+          context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
           context.intermediatePostIndexUpdates = new 
ArrayList<>(indexUpdates.size());
           while(indexUpdatesItr.hasNext()) {
               Pair<Pair<Mutation, byte[]>, byte[]> next = 
indexUpdatesItr.next();
@@ -549,35 +564,30 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
                   // add the VERIFIED cell, which is the empty cell
                   Mutation m = next.getFirst().getFirst();
                   boolean rebuild = 
PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
-                  long ts = getMaxTimestamp(m);
                   if (rebuild) {
                       if (m instanceof Put) {
+                          long ts = getMaxTimestamp(m);
+                          // Remove the empty column prepared by Index codec 
as we need to change its value
+                          removeEmptyColumn(m, emptyCF, emptyCQ);
                           ((Put)m).addColumn(emptyCF, emptyCQ, ts, 
VERIFIED_BYTES);
                       }
                   } else {
-                      if (m instanceof Put) {
-                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, 
UNVERIFIED_BYTES);
-                          // Ignore post index updates (i.e., the third write 
phase updates) for this row if it is
-                          // going through concurrent updates
-                          ImmutableBytesPtr rowKey = new 
ImmutableBytesPtr(next.getSecond());
-                          if (!context.pendingRows.contains(rowKey)) {
-                              Put put = new Put(m.getRow());
-                              put.addColumn(emptyCF, emptyCQ, ts, 
VERIFIED_BYTES);
-                              context.intermediatePostIndexUpdates.add(new 
Pair<>(new Pair<Mutation, byte[]>(put, next.getFirst().getSecond()), 
next.getSecond()));
-                          }
-                      } else {
-                          // For a delete mutation, first unverify the 
existing row in the index table and then delete
-                          // the row from the index table after deleting the 
corresponding row from the data table
-                          indexUpdatesItr.remove();
-                          Put put = new Put(m.getRow());
-                          put.addColumn(emptyCF, emptyCQ, ts, 
UNVERIFIED_BYTES);
-                          indexUpdatesForDeletes.add(new Pair<Mutation, 
byte[]>(put, next.getFirst().getSecond()));
-                          // Ignore post index updates (i.e., the third write 
phase updates) for this row if it is
-                          // going through concurrent updates
-                          ImmutableBytesPtr rowKey = new 
ImmutableBytesPtr(next.getSecond());
-                          if (!context.pendingRows.contains(rowKey)) {
-                              context.intermediatePostIndexUpdates.add(next);
+                      indexUpdatesItr.remove();
+                      // For this mutation whether it is put or delete, set 
the status of the index row "unverified"
+                      // This will be done before the data table row is 
updated (i.e., in the first write phase)
+                      Put unverifiedPut = new Put(m.getRow());
+                      unverifiedPut.addColumn(emptyCF, emptyCQ, now, 
UNVERIFIED_BYTES);
+                      context.preIndexUpdates.add(new Pair <Mutation, 
byte[]>(unverifiedPut, next.getFirst().getSecond()));
+                      // Ignore post index updates (i.e., the third write 
phase updates) for this row if it is
+                      // going through concurrent updates
+                      ImmutableBytesPtr rowKey = new 
ImmutableBytesPtr(next.getSecond());
+                      if (!context.pendingRows.contains(rowKey)) {
+                          if (m instanceof Put) {
+                              // Remove the empty column prepared by Index 
codec as we need to change its value
+                              removeEmptyColumn(m, emptyCF, emptyCQ);
+                              ((Put) m).addColumn(emptyCF, emptyCQ, now, 
VERIFIED_BYTES);
                           }
+                          context.intermediatePostIndexUpdates.add(next);
                       }
                   }
               }
@@ -586,10 +596,6 @@ public class IndexRegionObserver extends 
BaseRegionObserver {
               miniBatchOp.addOperationsFromCP(0,
                       localUpdates.toArray(new Mutation[localUpdates.size()]));
           }
-          if (!indexUpdatesForDeletes.isEmpty()) {
-              context.preIndexUpdates = indexUpdatesForDeletes;
-          }
-
           if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
               context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
           }
@@ -628,7 +634,7 @@ public class IndexRegionObserver extends BaseRegionObserver 
{
       }
 
       long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations);
+      prepareIndexMutations(c, miniBatchOp, context, mutations, now);
       
metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() 
- start);
 
       // Sleep for one millisecond if we have prepared the index updates in 
less than 1 ms. The sleep is necessary to

Reply via email to