kadirozde commented on a change in pull request #701: PHOENIX-5709 Simplify 
index update generation code for consistent glo…
URL: https://github.com/apache/phoenix/pull/701#discussion_r376899503
 
 

 ##########
 File path: 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 ##########
 @@ -409,91 +413,220 @@ private void populatePendingRows(BatchMutateContext 
context) {
       }
   }
 
-  private Collection<? extends Mutation> 
groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                        long now, ReplayWrite 
replayWrite) throws IOException {
-      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
-      boolean copyMutations = false;
-      for (int i = 0; i < miniBatchOp.size(); i++) {
+  public static void setTimestamp(Mutation m, long ts) throws IOException {
+      for (List<Cell> cells : m.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
+              CellUtil.setTimestamp(cell, ts);
+          }
+      }
+  }
+
+    public static long getTimestamp(Mutation m) throws IOException {
+        for (List<Cell> cells : m.getFamilyCellMap().values()) {
+            for (Cell cell : cells) {
+                return cell.getTimestamp();
+            }
+        }
+        return 0;
+    }
+
+  private static void removeColumn(Put put, Cell deleteCell) {
+      byte[] family = CellUtil.cloneFamily(deleteCell);
+      List<Cell> cellList = put.getFamilyCellMap().get(family);
+      if (cellList == null) {
+          return;
+      }
+      Iterator<Cell> cellIterator = cellList.iterator();
+      while (cellIterator.hasNext()) {
+          Cell cell = cellIterator.next();
+          if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(), cell.getQualifierLength(),
+                  deleteCell.getQualifierArray(), 
deleteCell.getQualifierOffset(), deleteCell.getQualifierLength()) == 0) {
+              cellIterator.remove();
+              if (cellList.isEmpty()) {
+                  put.getFamilyCellMap().remove(family);
+              }
+              return;
+          }
+      }
+  }
+
+
+  private void merge(Put current, Put previous) throws IOException {
+      for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
+              if (!current.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                  current.add(cell);
+              }
+          }
+      }
+  }
+
+  private Put mergeNew(Put current, Put previous) throws IOException {
+      Put next = new Put(current);
+      for (List<Cell> cells : previous.getFamilyCellMap().values()) {
+          for (Cell cell : cells) {
+              if (!current.has(CellUtil.cloneFamily(cell), 
CellUtil.cloneQualifier(cell))) {
+                  next.add(cell);
+              }
+          }
+      }
+      return next;
+  }
+
+  /**
+   * When there are multiple put mutations on the data same row within the 
same batch, this method merged them into
+   * one mutation.
+   */
+  private void mergePendingPutMutations(MiniBatchOperationInProgress<Mutation> 
miniBatchOp,
+                                        Map<ImmutableBytesPtr, Integer> 
pendingPuts,
+                                        long now) throws IOException {
+      for (Integer i = 0; i < miniBatchOp.size(); i++) {
           if (miniBatchOp.getOperationStatus(i) == IGNORE) {
               continue;
           }
           Mutation m = miniBatchOp.getOperation(i);
+          // skip this mutation if we aren't enabling indexing
           if (this.builder.isEnabled(m)) {
-              // Track whether or not we need to
-              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-              if (mutationsMap.containsKey(row)) {
-                  copyMutations = true;
-              } else {
-                  mutationsMap.put(row, null);
+              // Unless we're replaying edits to rebuild the index, we update 
the time stamp
+              // of the data table to prevent overlapping time stamps (which 
prevents index
+              // inconsistencies as this case isn't handled correctly 
currently).
+              setTimestamp(m, now);
+              if (m instanceof Put) {
+                  ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(m.getRow());
+                  Integer opIndex = pendingPuts.get(rowKeyPtr);
+                  pendingPuts.put(rowKeyPtr, i);
+                  if (opIndex != null) {
+                      merge((Put) m, (Put) miniBatchOp.getOperation(opIndex));
+                      miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+                  }
               }
           }
       }
-      // early exit if it turns out we don't have any edits
-      if (mutationsMap.isEmpty()) {
-          return null;
-      }
-      // If we're copying the mutations
-      Collection<Mutation> originalMutations;
-      Collection<? extends Mutation> mutations;
-      if (copyMutations) {
-          originalMutations = null;
-          mutations = mutationsMap.values();
-      } else {
-          originalMutations = 
Lists.newArrayListWithExpectedSize(mutationsMap.size());
-          mutations = originalMutations;
-      }
-
-      boolean resetTimeStamp = replayWrite == null;
+  }
 
-      for (int i = 0; i < miniBatchOp.size(); i++) {
+  /**
+   * When there are delete and put mutations on the data same row within the 
same batch, this method applies deletes
+   * on put mutations, that is effectively merge them.
+   */
+  private void 
applyPendingDeleteMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                           BatchMutateContext context,
+                                           Map<ImmutableBytesPtr, Integer> 
pendingPuts) throws IOException {
+      for (Integer i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE || 
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+              continue;
+          }
           Mutation m = miniBatchOp.getOperation(i);
-          // skip this mutation if we aren't enabling indexing
-          // unfortunately, we really should ask if the raw mutation (rather 
than the combined mutation)
-          // should be indexed, which means we need to expose another method 
on the builder. Such is the
-          // way optimization go though.
-          if (miniBatchOp.getOperationStatus(i) != IGNORE && 
this.builder.isEnabled(m)) {
-              if (resetTimeStamp) {
-                  // Unless we're replaying edits to rebuild the index, we 
update the time stamp
-                  // of the data table to prevent overlapping time stamps 
(which prevents index
-                  // inconsistencies as this case isn't handled correctly 
currently).
+          if (!this.builder.isEnabled(m)) {
+              continue;
+          }
+          if (m instanceof Delete) {
+              ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(m.getRow());
+              Integer opIndex = pendingPuts.get(rowKeyPtr);
+              Put mergedRow = context.mergedRowStates.get(rowKeyPtr);
+              if (opIndex != null || mergedRow != null) {
+                  Put put = (Put) miniBatchOp.getOperation(opIndex);
                   for (List<Cell> cells : m.getFamilyCellMap().values()) {
                       for (Cell cell : cells) {
-                          CellUtil.setTimestamp(cell, now);
+                          switch (cell.getType()) {
+                              case DeleteFamily:
+                              case DeleteFamilyVersion:
+                                  if (put != null)
+                                      
put.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                                  if (mergedRow != null)
+                                      
mergedRow.getFamilyCellMap().remove(CellUtil.cloneFamily(cell));
+                                  break;
+                              case DeleteColumn:
+                              case Delete:
+                                  if (put != null)
+                                      removeColumn(put, cell);
+                                  if (mergedRow != null)
+                                      removeColumn(mergedRow, cell);
+                          }
+                          if (put != null && put.getFamilyCellMap().size() == 
0) {
+                              pendingPuts.remove(rowKeyPtr);
+                              miniBatchOp.setOperationStatus(opIndex, NOWRITE);
+                          }
+                          if (mergedRow != null && 
mergedRow.getFamilyCellMap().size() == 0) {
+                              context.mergedRowStates.remove(rowKeyPtr);
+                          }
                       }
                   }
               }
-              // No need to write the table mutations when we're rebuilding
-              // the index as they're already written and just being replayed.
-              if (replayWrite == ReplayWrite.INDEX_ONLY
-                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY) {
-                  miniBatchOp.setOperationStatus(i, NOWRITE);
-              }
+          }
+      }
+  }
 
-              // Only copy mutations if we found duplicate rows
-              // which only occurs when we're partially rebuilding
-              // the index (since we'll potentially have both a
-              // Put and a Delete mutation for the same row).
-              if (copyMutations) {
-                  // Add the mutation to the batch set
-
-                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
-                  MultiMutation stored = mutationsMap.get(row);
-                  // we haven't seen this row before, so add it
-                  if (stored == null) {
-                      stored = new MultiMutation(row);
-                      mutationsMap.put(row, stored);
+  /**
+   * * Merges the mutations on the same row
+   */
+  private Collection<? extends Mutation> 
mergeMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                                        
MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        BatchMutateContext 
context,
+                                                        long now, boolean 
rebuild) throws IOException {
+      if (!rebuild) {
+          if (context.rowsToLock.size() == 0) {
+              return null;
+          }
+          Map<ImmutableBytesPtr, Integer> pendingPuts = new 
HashMap<>(miniBatchOp.size());
+          mergePendingPutMutations(miniBatchOp, pendingPuts, now);
+          getCurrentRowStates(c, context);
+          context.mergedRowStates = new HashMap<ImmutableBytesPtr, 
Put>(miniBatchOp.size());
+          // Merge pending put mutations with current row states into new 
merged states
+          for (Integer i = 0; i < miniBatchOp.size(); i++) {
+              if (miniBatchOp.getOperationStatus(i) == IGNORE || 
miniBatchOp.getOperationStatus(i) == NOWRITE) {
+                  continue;
+              }
+              Mutation m = miniBatchOp.getOperation(i);
+              if (this.builder.isEnabled(m)) {
+                  if (m instanceof Put) {
+                      ImmutableBytesPtr rowKeyPtr = new 
ImmutableBytesPtr(m.getRow());
+                      Put currentRow = context.currentRowStates.get(rowKeyPtr);
+                      Put mergedRow;
+                      if (currentRow != null) {
+                          mergedRow = mergeNew((Put) m, currentRow);
+                      }
+                      else {
+                          mergedRow = new Put((Put)m);
+                      }
+                      context.mergedRowStates.put(rowKeyPtr, mergedRow);
                   }
-                  stored.addAll(m);
-              } else {
-                  originalMutations.add(m);
               }
           }
+          // Apply delete mutations on the pending put mutations and merged 
row states
+          applyPendingDeleteMutations(miniBatchOp, context, pendingPuts);
       }
 
-      if (copyMutations || replayWrite != null) {
-          mutations = 
IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+      Collection<Mutation> pendingMutations = 
Lists.newArrayListWithExpectedSize(miniBatchOp.size());
+      // Add put mutations into the collection first. This ordering of puts 
first and then deletes is important for
+      // obtaining correct index updates
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          if (m instanceof Put) {
+              if (miniBatchOp.getOperationStatus(i) != IGNORE &&
+                      miniBatchOp.getOperationStatus(i) != NOWRITE && 
this.builder.isEnabled(m)) {
 
 Review comment:
   ok

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to