joshelser commented on a change in pull request #2228:
URL: https://github.com/apache/hbase/pull/2228#discussion_r469353813



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.

Review comment:
       nit: preferred

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {

Review comment:
       Maybe `doCoprocessorPreCallAfterRowLock()` and indicate that this method 
is a no-op for Mutations which do not have a `pre*AfterRowLock()` method in the 
javadoc?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) 
mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> 
results)

Review comment:
       What about `compute` or `calculate` instead of `reckon`? I had to go to 
a dictionary :)

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) 
mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> 
results)
+      throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: 
mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = 
reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {

Review comment:
       Genuine question, will this save us anything? Not sure how the JIT will 
(or won't) optimize such a thing away. I guess, at a minimum, it would save 
construction of an Iterator object?

##########
File path: 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
##########
@@ -1282,6 +1438,80 @@ public void 
testCheckAndMutateBatchWithFilterAndTimeRange() throws Throwable {
     assertEquals("f", Bytes.toString(result.getValue(FAMILY, 
Bytes.toBytes("F"))));
   }
 
+  @Test
+  public void testCheckAndIncrementBatch() throws Throwable {
+    AsyncTable<?> table = getTable.get();
+    byte[] row2 = Bytes.toBytes(Bytes.toString(row) + "2");
+
+    table.putAll(Arrays.asList(
+      new Put(row).addColumn(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+        .addColumn(FAMILY, Bytes.toBytes("B"), Bytes.toBytes(0L)),
+      new Put(row2).addColumn(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("c"))
+        .addColumn(FAMILY, Bytes.toBytes("D"), Bytes.toBytes(0L)))).get();
+
+    // CheckAndIncrement with correct value
+    CheckAndMutate checkAndMutate1 = CheckAndMutate.newBuilder(row)
+      .ifEquals(FAMILY, Bytes.toBytes("A"), Bytes.toBytes("a"))
+      .build(new Increment(row).addColumn(FAMILY, Bytes.toBytes("B"), 1));
+
+    // CheckAndIncrement with wrong value
+    CheckAndMutate checkAndMutate2 = CheckAndMutate.newBuilder(row2)
+      .ifEquals(FAMILY, Bytes.toBytes("C"), Bytes.toBytes("d"))
+      .build(new Increment(row2).addColumn(FAMILY, Bytes.toBytes("D"), 1));
+
+    List<CheckAndMutateResult> results =
+      table.checkAndMutateAll(Arrays.asList(checkAndMutate1, 
checkAndMutate2)).get();

Review comment:
       Can I send multiple CheckAndMutate's (each with their own Increment or 
Append) to the same row in one batch?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) 
mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> 
results)

Review comment:
       I see now that these were moving existing code. Better to keep the 
naming.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) 
mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> 
results)
+      throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      Map<byte[], List<Cell>> ret = new HashMap<>();
+      // Process a Store/family at a time.
+      for (Map.Entry<byte [], List<Cell>> entry: 
mutation.getFamilyCellMap().entrySet()) {
+        final byte[] columnFamilyName = entry.getKey();
+        List<Cell> deltas = entry.getValue();
+        // Reckon for the Store what to apply to WAL and MemStore.
+        List<Cell> toApply = 
reckonDeltasByStore(region.stores.get(columnFamilyName), mutation,
+          now, deltas, results);
+        if (!toApply.isEmpty()) {
+          for (Cell cell : toApply) {
+            HStore store = region.getStore(cell);
+            if (store == null) {
+              region.checkFamily(CellUtil.cloneFamily(cell));
+            } else {
+              ret.computeIfAbsent(store.getColumnFamilyDescriptor().getName(),
+                key -> new ArrayList<>()).add(cell);
+            }
+          }
+        }
+      }
+      return ret;
+    }
+
+    /**
+     * Reckon the Cells to apply to WAL, memstore, and to return to the Client 
in passed
+     * column family/Store.
+     *
+     * Does Get of current value and then adds passed in deltas for this Store 
returning the
+     * result.
+     *
+     * @param mutation The encompassing Mutation object
+     * @param deltas Changes to apply to this Store; either increment amount 
or data to append
+     * @param results In here we accumulate all the Cells we are to return to 
the client. If null,
+     *   client doesn't want results returned.
+     * @return Resulting Cells after <code>deltas</code> have been applied to 
current
+     *   values. Side effect is our filling out of the <code>results</code> 
List.
+     */
+    private List<Cell> reckonDeltasByStore(HStore store, Mutation mutation, 
long now,
+      List<Cell> deltas, List<Cell> results) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
+      List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
+
+      // Get previous values for all columns in this family.
+      TimeRange tr;
+      if (mutation instanceof Increment) {
+        tr = ((Increment) mutation).getTimeRange();
+      } else {
+        tr = ((Append) mutation).getTimeRange();
+      }
+      List<Cell> currentValues = get(mutation, store, deltas, tr);
+
+      // Iterate the input columns and update existing values if they were 
found, otherwise
+      // add new column initialized to the delta amount
+      int currentValuesIndex = 0;
+      for (int i = 0; i < deltas.size(); i++) {
+        Cell delta = deltas.get(i);
+        Cell currentValue = null;
+        if (currentValuesIndex < currentValues.size() &&
+          CellUtil.matchingQualifier(currentValues.get(currentValuesIndex), 
delta)) {
+          currentValue = currentValues.get(currentValuesIndex);
+          if (i < (deltas.size() - 1) && !CellUtil.matchingQualifier(delta, 
deltas.get(i + 1))) {
+            currentValuesIndex++;
+          }
+        }
+        // Switch on whether this an increment or an append building the new 
Cell to apply.
+        Cell newCell;
+        if (mutation instanceof Increment) {
+          long deltaAmount = getLongValue(delta);
+          final long newValue = currentValue == null ?
+            deltaAmount : getLongValue(currentValue) + deltaAmount;
+          newCell = reckonDelta(delta, currentValue, columnFamily, now, 
mutation,
+            (oldCell) -> Bytes.toBytes(newValue));
+        } else {

Review comment:
       Better to check that it's an Append and throw an exception if it isn't?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
##########
@@ -3805,6 +3838,196 @@ public void 
prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> mi
       }
     }
 
+    /**
+     * Do coprocessor pre-increment or pre-append call.
+     * @return Result returned out of the coprocessor, which means bypass all 
further processing
+     *   and return the proffered Result instead, or null which means proceed.
+     */
+    private Result doCoprocessorPreCall(Mutation mutation) throws IOException {
+      assert mutation instanceof Increment || mutation instanceof Append;
+      Result result = null;
+      if (region.coprocessorHost != null) {
+        if (mutation instanceof Increment) {
+          result = region.coprocessorHost.preIncrementAfterRowLock((Increment) 
mutation);
+        } else {
+          result = region.coprocessorHost.preAppendAfterRowLock((Append) 
mutation);
+        }
+      }
+      return result;
+    }
+
+    private Map<byte[], List<Cell>> reckonDeltas(Mutation mutation, List<Cell> 
results)

Review comment:
       Javadoc here would also be great to supplement `reckonDeltasByStore`'s 
javadoc.

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -742,36 +741,35 @@ private Result increment(final HRegion region, final 
OperationQuota quota,
     spaceQuota.getPolicyEnforcement(region).check(increment);
     quota.addMutation(increment);
     Result r = null;
-    if (region.getCoprocessorHost() != null) {
-      r = region.getCoprocessorHost().preIncrement(increment);
-    }
-    if (r == null) {
-      boolean canProceed = startNonceOperation(mutation, nonceGroup);
-      boolean success = false;
-      try {
-        long nonce = mutation.hasNonce() ? mutation.getNonce() : 
HConstants.NO_NONCE;
-        if (canProceed) {
-          r = region.increment(increment, nonceGroup, nonce);
-        } else {
+    boolean canProceed = startNonceOperation(mutation, nonceGroup);

Review comment:
       You've changed the semantics here. Before, we would call `preIncrement` 
and then create a new nonce'd operation. Now, we'll always make a new nonce 
operation, even if the CP is about to say "skip this increment"
   
   Is that intentional?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
##########
@@ -2910,23 +2908,35 @@ public MultiResponse multi(final RpcController rpcc, 
final MultiRequest request)
           }
 
           try {
-            CheckAndMutateResult result = checkAndMutate(region, 
regionAction.getActionList(),
-              cellScanner, regionAction.getCondition(), spaceQuotaEnforcement);
-            regionActionResultBuilder.setProcessed(result.isSuccess());
             ClientProtos.ResultOrException.Builder resultOrExceptionOrBuilder =
               ClientProtos.ResultOrException.newBuilder();
-            for (int i = 0; i < regionAction.getActionCount(); i++) {
-              if (i == 0 && result.getResult() != null) {
-                resultOrExceptionOrBuilder.setIndex(i);
-                
regionActionResultBuilder.addResultOrException(resultOrExceptionOrBuilder
-                  
.setResult(ProtobufUtil.toResult(result.getResult())).build());
-                continue;
+            if (regionAction.getActionCount() == 1) {
+              CheckAndMutateResult result = checkAndMutate(region, quota,
+                regionAction.getAction(0).getMutation(), cellScanner,
+                regionAction.getCondition(), spaceQuotaEnforcement);
+              regionActionResultBuilder.setProcessed(result.isSuccess());
+              resultOrExceptionOrBuilder.setIndex(0);
+              if (result.getResult() != null) {
+                
resultOrExceptionOrBuilder.setResult(ProtobufUtil.toResult(result.getResult()));

Review comment:
       Is it more optimal to check for the single action case? What does this 
get us over the previous method of using the "collection of Actions" method 
even if we only have a single action?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to