swaroopak commented on a change in pull request #701: PHOENIX-5709 Simplify 
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r376101927
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 ##########
 @@ -533,13 +666,139 @@ private void 
handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironmen
       }
   }
 
-  private void prepareIndexMutations(
-          ObserverContext<RegionCoprocessorEnvironment> c,
-          MiniBatchOperationInProgress<Mutation> miniBatchOp,
-          BatchMutateContext context,
-          Collection<? extends Mutation> mutations,
-          long now,
-          PhoenixIndexMetaData indexMetaData) throws Throwable {
+    /**
+     * Retrieve the the last committed row state. This method is called only 
for regular updates
+     */
+    private void 
getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                     BatchMutateContext context) throws 
IOException {
+        Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+        for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
+            keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+        }
+        Scan scan = new Scan();
+        ScanRanges scanRanges = ScanRanges.createPointLookup(new 
ArrayList<KeyRange>(keys));
+        scanRanges.initializeScan(scan);
+        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+        scan.setFilter(skipScanFilter);
+        context.currentRowStates = new HashMap<ImmutableBytesPtr, 
Put>(context.rowsToLock.size());
+        try (RegionScanner scanner = 
c.getEnvironment().getRegion().getScanner(scan)) {
+            boolean more = true;
+            while(more) {
+                List<Cell> cells = new ArrayList<Cell>();
+                more = scanner.next(cells);
+                if (cells.isEmpty()) {
+                    continue;
+                }
+                byte[] rowKey = CellUtil.cloneRow(cells.get(0));
+                Put put = new Put(rowKey);
+                for (Cell cell : cells) {
+                    put.add(cell);
+                }
+                context.currentRowStates.put(new ImmutableBytesPtr(rowKey), 
put);
+            }
+        }
+    }
+
+    private Collection<Cell> getCollection(Mutation m) {
+        Collection<Cell> collection = new ArrayList<>();
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            collection.addAll(cells);
+        }
+        return collection;
+    }
+
+    /**
+     * Generate the index updates from the mutations without considering the 
previous row state. This method is called
+     * when to rebuild index rows from the existing data table rows. This 
method is used only for global indexes
+     */
+    private void prepareIndexMutationsForIndexRebuild(BatchMutateContext 
context,
+                                                      List<IndexMaintainer> 
maintainers,
+                                                      Collection<? extends 
Mutation> pendingMutations) throws IOException {
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            for (Mutation mutation : pendingMutations) {
+                ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(mutation.getRow());
+                ValueGetter valueGetter = new 
IndexRebuildRegionScanner.SimpleValueGetter((Put) mutation);
+                Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, 
valueGetter,
+                        rowKeyPtr, getMaxTimestamp(mutation), null, null);
+                if (indexPut == null) {
+                    byte[] indexRowKey = 
indexMaintainer.buildRowKey(valueGetter, rowKeyPtr,
+                            null, null, HConstants.LATEST_TIMESTAMP);
+                    indexPut = new Put(indexRowKey);
+                }
+                
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
+                            indexMaintainer.getEmptyKeyValueQualifier(), 
getMaxTimestamp(mutation), VERIFIED_BYTES);
+                context.indexUpdates.put(hTableInterfaceReference, new 
Pair<>((Mutation)indexPut, mutation.getRow()));
+            }
+        }
+    }
+
+    /**
+     * Generate the index update for a data row from the mutation that are 
obtained by merging the previous data row
+     * state with the pending row mutation.
+     */
+    private void 
prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                       BatchMutateContext context,
+                                       List<IndexMaintainer> maintainers,
+                                       Collection<? extends Mutation> 
pendingMutations) throws IOException {
+        for (IndexMaintainer indexMaintainer : maintainers) {
+            HTableInterfaceReference hTableInterfaceReference =
+                    new HTableInterfaceReference(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+            byte[] regionStartKey = null;
+            byte[] regionEndKey = null;
+            if (indexMaintainer.isLocalIndex()) {
+                regionStartKey = 
c.getEnvironment().getRegion().getRegionInfo().getStartKey();
+                regionEndKey = 
c.getEnvironment().getRegion().getRegionInfo().getEndKey();
+            }
+            for (Mutation mutation : pendingMutations) {
+                ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(mutation.getRow());
+                Put currentRow = context.currentRowStates.get(rowKeyPtr);
+                ValueGetter currentRowVG = (currentRow == null) ? null :
+                        new 
IndexRebuildRegionScanner.SimpleValueGetter(currentRow);
+                long ts = getTimestamp(mutation);
+                if (mutation instanceof Put) {
+                    Put mergedRow = context.mergedRowStates.get(rowKeyPtr);
+                    ValueGetter mergedRowVG = new 
IndexRebuildRegionScanner.SimpleValueGetter(mergedRow);
+                    Put indexPut = 
indexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE,
+                            mergedRowVG, rowKeyPtr, ts, regionStartKey, 
regionEndKey);
+                    if (indexPut == null) {
+                        // No covered column. Just prepare an index row with 
the empty column
+                        byte[] indexRowKey = 
indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr,
+                                regionStartKey, regionEndKey, 
HConstants.LATEST_TIMESTAMP);
+                        indexPut = new Put(indexRowKey);
+                    }
+                    
indexPut.addColumn(indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
 
 Review comment:
   why shouldn't this be `dataEmptyKeyValueCF` from IndexMaintainer?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to