HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f5ea00f7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f5ea00f7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f5ea00f7 Branch: refs/heads/HBASE-21512 Commit: f5ea00f72442e5c80f2a5fc6e99506127fa8d16b Parents: c2d5991 Author: Guanghao Zhang <zg...@apache.org> Authored: Wed Dec 26 17:42:02 2018 +0800 Committer: Guanghao Zhang <zg...@apache.org> Committed: Thu Dec 27 18:27:06 2018 +0800 ---------------------------------------------------------------------- .../hbase/coprocessor/RegionObserver.java | 47 ++++++++++++++++++++ .../hadoop/hbase/regionserver/HRegion.java | 26 ++++++----- .../regionserver/RegionCoprocessorHost.java | 29 +++++++++--- .../hbase/security/access/AccessController.java | 30 ++++++++++--- .../visibility/VisibilityController.java | 30 +++++++++++-- 5 files changed, 134 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index c14cbd1..95b2150 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.coprocessor; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -1029,13 +1030,59 @@ public interface RegionObserver { * @param oldCell old cell containing previous value * @param newCell the new cell containing the computed value * @return the new cell, possibly changed + * @deprecated Use {@link #postIncrementBeforeWAL} or {@link #postAppendBeforeWAL} instead. */ + @Deprecated default Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { return newCell; } /** + * Called after a list of new cells has been created during an increment operation, but before + * they are committed to the WAL or memstore. + * + * @param ctx the environment provided by the region server + * @param mutation the current mutation + * @param cellPairs a list of cell pair. The first cell is old cell which may be null. + * And the second cell is the new cell. + * @return a list of cell pair, possibly changed. + */ + default List<Pair<Cell, Cell>> postIncrementBeforeWAL( + ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, + List<Pair<Cell, Cell>> cellPairs) throws IOException { + List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size()); + for (Pair<Cell, Cell> pair : cellPairs) { + resultPairs.add(new Pair<>(pair.getFirst(), + postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(), + pair.getSecond()))); + } + return resultPairs; + } + + /** + * Called after a list of new cells has been created during an append operation, but before + * they are committed to the WAL or memstore. + * + * @param ctx the environment provided by the region server + * @param mutation the current mutation + * @param cellPairs a list of cell pair. The first cell is old cell which may be null. + * And the second cell is the new cell. + * @return a list of cell pair, possibly changed. + */ + default List<Pair<Cell, Cell>> postAppendBeforeWAL( + ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, + List<Pair<Cell, Cell>> cellPairs) throws IOException { + List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size()); + for (Pair<Cell, Cell> pair : cellPairs) { + resultPairs.add(new Pair<>(pair.getFirst(), + postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(), + pair.getSecond()))); + } + return resultPairs; + } + + /** * Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing * this hook would help in creating customised DeleteTracker and returning * the newly created DeleteTracker http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 9bf9309..ec222c7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -70,6 +70,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -120,7 +122,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -8014,7 +8015,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results) throws IOException { byte[] columnFamily = store.getColumnFamilyDescriptor().getName(); - List<Cell> toApply = new ArrayList<>(deltas.size()); + List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size()); // Get previous values for all columns in this family. TimeRange tr = null; switch (op) { @@ -8041,18 +8042,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi currentValuesIndex++; } } + // Switch on whether this an increment or an append building the new Cell to apply. Cell newCell = null; - MutationType mutationType = null; switch (op) { case INCREMENT: - mutationType = MutationType.INCREMENT; long deltaAmount = getLongValue(delta); final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount; newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue)); break; case APPEND: - mutationType = MutationType.APPEND; newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()]) .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength()) @@ -8063,18 +8062,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi default: throw new UnsupportedOperationException(op.toString()); } - // Give coprocessors a chance to update the new cell - if (coprocessorHost != null) { - newCell = - coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell); - } - toApply.add(newCell); + cellPairs.add(new Pair<>(currentValue, newCell)); // Add to results to get returned to the Client. If null, cilent does not want results. if (results != null) { results.add(newCell); } } - return toApply; + + // Give coprocessors a chance to update the new cells before apply to WAL or memstore + if (coprocessorHost != null) { + // Here the operation must be increment or append. + cellPairs = op == Operation.INCREMENT ? + coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) : + coprocessorHost.postAppendBeforeWAL(mutation, cellPairs); + } + return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList()); } private static Cell reckonDelta(final Cell delta, final Cell currentCell, http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index dea13ca..16fd332 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; @@ -1691,16 +1690,32 @@ public class RegionCoprocessorHost }); } - public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation, - final Cell oldCell, Cell newCell) throws IOException { + public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation, + final List<Pair<Cell, Cell>> cellPairs) throws IOException { if (this.coprocEnvironments.isEmpty()) { - return newCell; + return cellPairs; } return execOperationWithResult( - new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) { + new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( + regionObserverGetter, cellPairs) { @Override - public Cell call(RegionObserver observer) throws IOException { - return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult()); + public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { + return observer.postIncrementBeforeWAL(this, mutation, getResult()); + } + }); + } + + public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation, + final List<Pair<Cell, Cell>> cellPairs) throws IOException { + if (this.coprocEnvironments.isEmpty()) { + return cellPairs; + } + return execOperationWithResult( + new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>( + regionObserverGetter, cellPairs) { + @Override + public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException { + return observer.postAppendBeforeWAL(this, mutation, getResult()); } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 82ec12d..6e2c9ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -36,6 +36,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ArrayBackedTag; @@ -1849,14 +1850,34 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, } @Override - public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, - MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + public List<Pair<Cell, Cell>> postIncrementBeforeWAL( + ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, + List<Pair<Cell, Cell>> cellPairs) throws IOException { // If the HFile version is insufficient to persist tags, we won't have any // work to do here if (!cellFeaturesEnabled) { - return newCell; + return cellPairs; } + return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(), + createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond()))) + .collect(Collectors.toList()); + } + + @Override + public List<Pair<Cell, Cell>> postAppendBeforeWAL( + ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, + List<Pair<Cell, Cell>> cellPairs) throws IOException { + // If the HFile version is insufficient to persist tags, we won't have any + // work to do here + if (!cellFeaturesEnabled) { + return cellPairs; + } + return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(), + createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond()))) + .collect(Collectors.toList()); + } + private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) { // Collect any ACLs from the old cell List<Tag> tags = Lists.newArrayList(); List<Tag> aclTags = Lists.newArrayList(); @@ -1901,8 +1922,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, return newCell; } - Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags); - return rewriteCell; + return PrivateCellUtil.createCell(newCell, tags); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java index c4f3b95..2a18551 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java @@ -127,6 +127,7 @@ import org.slf4j.LoggerFactory; public class VisibilityController implements MasterCoprocessor, RegionCoprocessor, VisibilityLabelsService.Interface, MasterObserver, RegionObserver { + private static final Logger LOG = LoggerFactory.getLogger(VisibilityController.class); private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger." + VisibilityController.class.getName()); @@ -688,8 +689,30 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso } @Override - public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx, - MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException { + public List<Pair<Cell, Cell>> postIncrementBeforeWAL( + ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, + List<Pair<Cell, Cell>> cellPairs) throws IOException { + List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size()); + for (Pair<Cell, Cell> pair : cellPairs) { + resultPairs + .add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond()))); + } + return resultPairs; + } + + @Override + public List<Pair<Cell, Cell>> postAppendBeforeWAL( + ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation, + List<Pair<Cell, Cell>> cellPairs) throws IOException { + List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size()); + for (Pair<Cell, Cell> pair : cellPairs) { + resultPairs + .add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond()))); + } + return resultPairs; + } + + private Cell createNewCellWithTags(Mutation mutation, Cell newCell) throws IOException { List<Tag> tags = Lists.newArrayList(); CellVisibility cellVisibility = null; try { @@ -715,8 +738,7 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso } } - Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags); - return rewriteCell; + return PrivateCellUtil.createCell(newCell, tags); } @Override