HBASE-19504 Add TimeRange support into checkAndMutate Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad47c2da Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad47c2da Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad47c2da Branch: refs/heads/HBASE-19064 Commit: ad47c2daf4d9dc3b85ec91e0fe8385aa6dd9c492 Parents: cd5a821 Author: Chia-Ping Tsai <chia7...@gmail.com> Authored: Sat Mar 24 00:05:41 2018 +0800 Committer: Chia-Ping Tsai <chia7...@gmail.com> Committed: Sat Mar 24 00:12:38 2018 +0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hbase/client/Append.java | 2 +- .../apache/hadoop/hbase/client/AsyncTable.java | 8 +- .../hadoop/hbase/client/AsyncTableImpl.java | 9 +- .../org/apache/hadoop/hbase/client/Get.java | 2 +- .../org/apache/hadoop/hbase/client/HTable.java | 111 +++++++------ .../apache/hadoop/hbase/client/Increment.java | 2 +- .../hadoop/hbase/client/RawAsyncTableImpl.java | 15 +- .../org/apache/hadoop/hbase/client/Scan.java | 2 +- .../org/apache/hadoop/hbase/client/Table.java | 6 + .../hadoop/hbase/protobuf/ProtobufUtil.java | 81 +++------- .../hbase/shaded/protobuf/ProtobufUtil.java | 108 +++++-------- .../hbase/shaded/protobuf/RequestConverter.java | 77 ++++----- .../hbase/shaded/protobuf/TestProtobufUtil.java | 6 +- .../org/apache/hadoop/hbase/io/TimeRange.java | 17 ++ .../src/main/protobuf/Client.proto | 1 + hbase-protocol/src/main/protobuf/Client.proto | 1 + .../hadoop/hbase/rest/client/RemoteHTable.java | 5 + .../hadoop/hbase/regionserver/HRegion.java | 24 +-- .../hbase/regionserver/RSRpcServices.java | 158 ++++++++++--------- .../hadoop/hbase/regionserver/Region.java | 55 +++++-- .../hadoop/hbase/client/TestAsyncTable.java | 63 ++++++++ .../hadoop/hbase/client/TestFromClientSide.java | 55 +++++++ .../client/TestMalformedCellFromClient.java | 2 +- .../hadoop/hbase/protobuf/TestProtobufUtil.java | 5 + .../hbase/regionserver/TestAtomicOperation.java | 2 +- .../hadoop/hbase/regionserver/TestHRegion.java | 68 ++++---- .../TestSimpleTimeRangeTracker.java | 10 +- 27 files changed, 529 insertions(+), 366 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java index 61474b7..3a08d68 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java @@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory; public class Append extends Mutation { private static final Logger LOG = LoggerFactory.getLogger(Append.class); private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); /** * Sets the TimeRange to be used on the Get for this append. http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 37c80b3..cc1ba87 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -22,15 +22,14 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf; import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly; import com.google.protobuf.RpcChannel; - import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; @@ -236,6 +235,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase> { CheckAndMutateBuilder qualifier(byte[] qualifier); /** + * @param timeRange time range to check. + */ + CheckAndMutateBuilder timeRange(TimeRange timeRange); + + /** * Check for lack of column. */ CheckAndMutateBuilder ifNotExists(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index c8553c6..9747d06 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.client; import static java.util.stream.Collectors.toList; import com.google.protobuf.RpcChannel; - import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.yetus.audience.InterfaceAudience; /** @@ -152,6 +151,12 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> { } @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + builder.timeRange(timeRange); + return this; + } + + @Override public CheckAndMutateBuilder ifNotExists() { builder.ifNotExists(); return this; http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java index 9ed3b38..aae52d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java @@ -72,7 +72,7 @@ public class Get extends Query implements Row { private boolean cacheBlocks = true; private int storeLimit = -1; private int storeOffset = 0; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); private boolean checkExistenceOnly = false; private boolean closestRowBefore = false; private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 1a11979..69ec366 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -692,14 +693,14 @@ public class HTable implements Table { @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final byte [] value, final Put put) throws IOException { - return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, put); + return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put); } @Override @Deprecated public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException { - return doCheckAndPut(row, family, qualifier, compareOp.name(), value, put); + return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put); } @Override @@ -708,11 +709,12 @@ public class HTable implements Table { final CompareOperator op, final byte [] value, final Put put) throws IOException { // The name of the operators in CompareOperator are intentionally those of the // operators in the filter's CompareOp enum. - return doCheckAndPut(row, family, qualifier, op.name(), value, put); + return doCheckAndPut(row, family, qualifier, op.name(), value, null, put); } - private boolean doCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, - final String opName, final byte [] value, final Put put) throws IOException { + private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier, + final String opName, final byte[] value, final TimeRange timeRange, final Put put) + throws IOException { ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row, this.rpcControllerFactory.newController(), put.getPriority()) { @@ -721,7 +723,7 @@ public class HTable implements Table { CompareType compareType = CompareType.valueOf(opName); MutateRequest request = RequestConverter.buildMutateRequest( getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, put); + new BinaryComparator(value), compareType, timeRange, put); MutateResponse response = doMutate(request); return Boolean.valueOf(response.getProcessed()); } @@ -732,60 +734,58 @@ public class HTable implements Table { @Override @Deprecated - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final byte [] value, final Delete delete) throws IOException { - return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, delete); + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final byte[] value, final Delete delete) throws IOException { + return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, + delete); } @Override @Deprecated - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException { - return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, delete); + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException { + return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete); } @Override @Deprecated - public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final CompareOperator op, final byte [] value, final Delete delete) throws IOException { - return doCheckAndDelete(row, family, qualifier, op.name(), value, delete); + public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final CompareOperator op, final byte[] value, final Delete delete) throws IOException { + return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete); } - private boolean doCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier, - final String opName, final byte [] value, final Delete delete) throws IOException { + private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, + final String opName, final byte[] value, final TimeRange timeRange, final Delete delete) + throws IOException { CancellableRegionServerCallable<SingleResponse> callable = - new CancellableRegionServerCallable<SingleResponse>( - this.connection, getName(), row, this.rpcControllerFactory.newController(), - writeRpcTimeoutMs, new RetryingTimeTracker().start(), delete.getPriority()) { - @Override - protected SingleResponse rpcCall() throws Exception { - CompareType compareType = CompareType.valueOf(opName); - MutateRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, delete); - MutateResponse response = doMutate(request); - return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); - } - }; + new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row, + this.rpcControllerFactory.newController(), writeRpcTimeoutMs, + new RetryingTimeTracker().start(), delete.getPriority()) { + @Override + protected SingleResponse rpcCall() throws Exception { + CompareType compareType = CompareType.valueOf(opName); + MutateRequest request = RequestConverter + .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, + qualifier, new BinaryComparator(value), compareType, timeRange, delete); + MutateResponse response = doMutate(request); + return ResponseConverter.getResult(request, response, getRpcControllerCellScanner()); + } + }; List<Delete> rows = Collections.singletonList(delete); Object[] results = new Object[1]; - AsyncProcessTask task = AsyncProcessTask.newBuilder() - .setPool(pool) - .setTableName(tableName) - .setRowAccess(rows) + AsyncProcessTask task = + AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows) .setCallable(callable) // TODO any better timeout? .setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs)) .setOperationTimeout(operationTimeoutMs) - .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL) - .setResults(results) - .build(); + .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build(); AsyncRequestFuture ars = multiAp.submit(task); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); } - return ((SingleResponse.Entry)results[0]).isProcessed(); + return ((SingleResponse.Entry) results[0]).isProcessed(); } @Override @@ -793,9 +793,9 @@ public class HTable implements Table { return new CheckAndMutateBuilderImpl(row, family); } - private boolean doCheckAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, - final String opName, final byte [] value, final RowMutations rm) - throws IOException { + private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier, + final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm) + throws IOException { CancellableRegionServerCallable<MultiResponse> callable = new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(), @@ -803,18 +803,18 @@ public class HTable implements Table { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(opName); - MultiRequest request = RequestConverter.buildMutateRequest( - getLocation().getRegionInfo().getRegionName(), row, family, qualifier, - new BinaryComparator(value), compareType, rm); + MultiRequest request = RequestConverter + .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier, + new BinaryComparator(value), compareType, timeRange, rm); ClientProtos.MultiResponse response = doMulti(request); ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0); if (res.hasException()) { Throwable ex = ProtobufUtil.toException(res.getException()); if (ex instanceof IOException) { - throw (IOException)ex; + throw (IOException) ex; } - throw new IOException("Failed to checkAndMutate row: "+ - Bytes.toStringBinary(rm.getRow()), ex); + throw new IOException( + "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex); } return ResponseConverter.getResults(request, response, getRpcControllerCellScanner()); } @@ -850,14 +850,14 @@ public class HTable implements Table { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, rm); + return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm); } @Override @Deprecated public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException { - return doCheckAndMutate(row, family, qualifier, op.name(), value, rm); + return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm); } @Override @@ -1234,6 +1234,7 @@ public class HTable implements Table { private final byte[] row; private final byte[] family; private byte[] qualifier; + private TimeRange timeRange; private CompareOperator op; private byte[] value; @@ -1250,6 +1251,12 @@ public class HTable implements Table { } @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + @Override public CheckAndMutateBuilder ifNotExists() { this.op = CompareOperator.EQUAL; this.value = null; @@ -1271,19 +1278,19 @@ public class HTable implements Table { @Override public boolean thenPut(Put put) throws IOException { preCheck(); - return doCheckAndPut(row, family, qualifier, op.name(), value, put); + return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put); } @Override public boolean thenDelete(Delete delete) throws IOException { preCheck(); - return doCheckAndDelete(row, family, qualifier, op.name(), value, delete); + return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete); } @Override public boolean thenMutate(RowMutations mutation) throws IOException { preCheck(); - return doCheckAndMutate(row, family, qualifier, op.name(), value, mutation); + return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation); } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java index 76208d6..d7d1116 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java @@ -48,7 +48,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Public public class Increment extends Mutation { private static final int HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); /** * Create a Increment operation for the specified row. http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index e6f78a1..d705d7c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; import org.apache.hadoop.hbase.filter.BinaryComparator; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -265,6 +266,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { private byte[] qualifier; + private TimeRange timeRange; + private CompareOperator op; private byte[] value; @@ -282,6 +285,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { } @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + this.timeRange = timeRange; + return this; + } + + @Override public CheckAndMutateBuilder ifNotExists() { this.op = CompareOperator.EQUAL; this.value = null; @@ -307,7 +316,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { .action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), p), + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p), (c, r) -> r.getProcessed())) .call(); } @@ -319,7 +328,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { .action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), d), + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d), (c, r) -> r.getProcessed())) .call(); } @@ -331,7 +340,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> { .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, - new BinaryComparator(value), CompareType.valueOf(op.name()), rm), + new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm), resp -> resp.getExists())) .call(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 7139b26..20a2ada 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -141,7 +141,7 @@ public class Scan extends Query { private long maxResultSize = -1; private boolean cacheBlocks = true; private boolean reversed = false; - private TimeRange tr = new TimeRange(); + private TimeRange tr = TimeRange.allTime(); private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR); private Boolean asyncPrefetch = null; http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 81513fe..fab439c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.filter.CompareFilter; @@ -438,6 +439,11 @@ public interface Table extends Closeable { CheckAndMutateBuilder qualifier(byte[] qualifier); /** + * @param timeRange timeRange to check + */ + CheckAndMutateBuilder timeRange(TimeRange timeRange); + + /** * Check for lack of column. */ CheckAndMutateBuilder ifNotExists(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index 3c01fd6..1b5b1e8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -29,7 +29,6 @@ import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; import com.google.protobuf.TextFormat; - import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.Method; @@ -38,10 +37,8 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NavigableSet; import java.util.function.Function; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell.Type; @@ -876,20 +873,13 @@ public final class ProtobufUtil { scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand); } scanBuilder.setMaxVersions(scan.getMaxVersions()); - for (Entry<byte[], TimeRange> cftr : scan.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(ByteStringer.wrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - scanBuilder.addCfTimeRange(b); - } - TimeRange timeRange = scan.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - scanBuilder.setTimeRange(timeRangeBuilder.build()); - } + scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> { + scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(ByteStringer.wrap(cf)) + .setTimeRange(toTimeRange(timeRange)) + .build()); + }); + scanBuilder.setTimeRange(toTimeRange(scan.getTimeRange())); Map<String, byte[]> attributes = scan.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1077,20 +1067,12 @@ public final class ProtobufUtil { if (get.getFilter() != null) { builder.setFilter(ProtobufUtil.toFilter(get.getFilter())); } - for (Entry<byte[], TimeRange> cftr : get.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(ByteStringer.wrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - builder.addCfTimeRange(b); - } - TimeRange timeRange = get.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } + get.getColumnFamilyTimeRange().forEach((cf, timeRange) -> + builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(ByteStringer.wrap(cf)) + .setTimeRange(toTimeRange(timeRange)).build()) + ); + builder.setTimeRange(toTimeRange(get.getTimeRange())); Map<String, byte[]> attributes = get.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1136,16 +1118,6 @@ public final class ProtobufUtil { return builder.build(); } - static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) { - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } - } - public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { return toMutation(type, mutation, HConstants.NO_NONCE); @@ -1177,12 +1149,10 @@ public final class ProtobufUtil { builder.setNonce(nonce); } if (type == MutationType.INCREMENT) { - TimeRange timeRange = ((Increment) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(toTimeRange(((Increment) mutation).getTimeRange())); } if (type == MutationType.APPEND) { - TimeRange timeRange = ((Append) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(toTimeRange(((Append) mutation).getTimeRange())); } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); @@ -1240,10 +1210,10 @@ public final class ProtobufUtil { getMutationBuilderAndSetCommonFields(type, mutation, builder); builder.setAssociatedCellCount(mutation.size()); if (mutation instanceof Increment) { - setTimeRange(builder, ((Increment)mutation).getTimeRange()); + builder.setTimeRange(toTimeRange(((Increment)mutation).getTimeRange())); } if (mutation instanceof Append) { - setTimeRange(builder, ((Append)mutation).getTimeRange()); + builder.setTimeRange(toTimeRange(((Append)mutation).getTimeRange())); } if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); @@ -1719,14 +1689,6 @@ public final class ProtobufUtil { codedInput.checkLastTagWas(0); } - private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - return timeRangeBuilder; - } - private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException { long minStamp = 0; long maxStamp = Long.MAX_VALUE; @@ -1819,4 +1781,13 @@ public final class ProtobufUtil { } return RSGroupInfo; } + + public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) { + if (timeRange == null) { + timeRange = TimeRange.allTime(); + } + return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin()) + .setTo(timeRange.getMax()) + .build(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 13cf76c..65113b9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -531,13 +531,13 @@ public final class ProtobufUtil { } if (proto.getCfTimeRangeCount() > 0) { for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) { - TimeRange timeRange = protoToTimeRange(cftr.getTimeRange()); + TimeRange timeRange = toTimeRange(cftr.getTimeRange()); get.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(), timeRange.getMin(), timeRange.getMax()); } } if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); get.setTimeRange(timeRange.getMin(), timeRange.getMax()); } if (proto.hasFilter()) { @@ -860,7 +860,7 @@ public final class ProtobufUtil { Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()), Append::add, proto, cellScanner); if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); append.setTimeRange(timeRange.getMin(), timeRange.getMax()); } return append; @@ -880,7 +880,7 @@ public final class ProtobufUtil { Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()), Increment::add, proto, cellScanner); if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); increment.setTimeRange(timeRange.getMin(), timeRange.getMax()); } return increment; @@ -952,7 +952,7 @@ public final class ProtobufUtil { } } if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); get.setTimeRange(timeRange.getMin(), timeRange.getMax()); } for (NameBytesPair attribute : proto.getAttributeList()) { @@ -1016,20 +1016,13 @@ public final class ProtobufUtil { scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand); } scanBuilder.setMaxVersions(scan.getMaxVersions()); - for (Entry<byte[], TimeRange> cftr : scan.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - scanBuilder.addCfTimeRange(b); - } - TimeRange timeRange = scan.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - scanBuilder.setTimeRange(timeRangeBuilder.build()); - } + scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> { + scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(UnsafeByteOperations.unsafeWrap(cf)) + .setTimeRange(toTimeRange(timeRange)) + .build()); + }); + scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(scan.getTimeRange())); Map<String, byte[]> attributes = scan.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1148,13 +1141,13 @@ public final class ProtobufUtil { } if (proto.getCfTimeRangeCount() > 0) { for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) { - TimeRange timeRange = protoToTimeRange(cftr.getTimeRange()); + TimeRange timeRange = toTimeRange(cftr.getTimeRange()); scan.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(), timeRange.getMin(), timeRange.getMax()); } } if (proto.hasTimeRange()) { - TimeRange timeRange = protoToTimeRange(proto.getTimeRange()); + TimeRange timeRange = toTimeRange(proto.getTimeRange()); scan.setTimeRange(timeRange.getMin(), timeRange.getMax()); } if (proto.hasFilter()) { @@ -1244,20 +1237,13 @@ public final class ProtobufUtil { if (get.getFilter() != null) { builder.setFilter(ProtobufUtil.toFilter(get.getFilter())); } - for (Entry<byte[], TimeRange> cftr : get.getColumnFamilyTimeRange().entrySet()) { - HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder(); - b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey())); - b.setTimeRange(timeRangeToProto(cftr.getValue())); - builder.addCfTimeRange(b); - } - TimeRange timeRange = get.getTimeRange(); - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } + get.getColumnFamilyTimeRange().forEach((cf, timeRange) -> { + builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder() + .setColumnFamily(UnsafeByteOperations.unsafeWrap(cf)) + .setTimeRange(toTimeRange(timeRange)) + .build()); + }); + builder.setTimeRange(ProtobufUtil.toTimeRange(get.getTimeRange())); Map<String, byte[]> attributes = get.getAttributesMap(); if (!attributes.isEmpty()) { NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder(); @@ -1302,16 +1288,6 @@ public final class ProtobufUtil { return builder.build(); } - static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) { - if (!timeRange.isAllTime()) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - builder.setTimeRange(timeRangeBuilder.build()); - } - } - public static MutationProto toMutation(final MutationType type, final Mutation mutation) throws IOException { return toMutation(type, mutation, HConstants.NO_NONCE); @@ -1343,12 +1319,10 @@ public final class ProtobufUtil { builder.setNonce(nonce); } if (type == MutationType.INCREMENT) { - TimeRange timeRange = ((Increment) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange())); } if (type == MutationType.APPEND) { - TimeRange timeRange = ((Append) mutation).getTimeRange(); - setTimeRange(builder, timeRange); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange())); } ColumnValue.Builder columnBuilder = ColumnValue.newBuilder(); QualifierValue.Builder valueBuilder = QualifierValue.newBuilder(); @@ -1406,10 +1380,10 @@ public final class ProtobufUtil { getMutationBuilderAndSetCommonFields(type, mutation, builder); builder.setAssociatedCellCount(mutation.size()); if (mutation instanceof Increment) { - setTimeRange(builder, ((Increment)mutation).getTimeRange()); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange())); } if (mutation instanceof Append) { - setTimeRange(builder, ((Append)mutation).getTimeRange()); + builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange())); } if (nonce != HConstants.NO_NONCE) { builder.setNonce(nonce); @@ -2756,24 +2730,11 @@ public final class ProtobufUtil { return scList; } - private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) { - HBaseProtos.TimeRange.Builder timeRangeBuilder = - HBaseProtos.TimeRange.newBuilder(); - timeRangeBuilder.setFrom(timeRange.getMin()); - timeRangeBuilder.setTo(timeRange.getMax()); - return timeRangeBuilder; - } - - private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException { - long minStamp = 0; - long maxStamp = Long.MAX_VALUE; - if (timeRange.hasFrom()) { - minStamp = timeRange.getFrom(); - } - if (timeRange.hasTo()) { - maxStamp = timeRange.getTo(); - } - return new TimeRange(minStamp, maxStamp); + public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) { + return timeRange == null ? + TimeRange.allTime() : + new TimeRange(timeRange.hasFrom() ? timeRange.getFrom() : 0, + timeRange.hasTo() ? timeRange.getTo() : Long.MAX_VALUE); } /** @@ -3228,4 +3189,13 @@ public final class ProtobufUtil { .setTimeStampsOfLastAppliedOp(rls.getTimeStampsOfLastAppliedOp()) .build(); } + + public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) { + if (timeRange == null) { + timeRange = TimeRange.allTime(); + } + return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin()) + .setTo(timeRange.getMax()) + .build(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 0afcfe1..8ce2f1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -235,16 +236,9 @@ public final class RequestConverter { public static MutateRequest buildMutateRequest( final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, final Put put) throws IOException { - MutateRequest.Builder builder = MutateRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder())); - builder.setCondition(condition); - return builder.build(); + final CompareType compareType, TimeRange timeRange, final Put put) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange + , put, MutationType.PUT); } /** @@ -263,19 +257,21 @@ public final class RequestConverter { public static MutateRequest buildMutateRequest( final byte[] regionName, final byte[] row, final byte[] family, final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, final Delete delete) throws IOException { - MutateRequest.Builder builder = MutateRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( - RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); - builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete, - MutationProto.newBuilder())); - builder.setCondition(condition); - return builder.build(); + final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException { + return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange + , delete, MutationType.DELETE); + } + + public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row, + final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator, + final CompareType compareType, TimeRange timeRange, final Mutation mutation, + final MutationType type) throws IOException { + return MutateRequest.newBuilder() + .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)) + .setMutation(ProtobufUtil.toMutation(type, mutation)) + .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange)) + .build(); } - /** * Create a protocol buffer MutateRequest for conditioned row mutations * @@ -289,17 +285,15 @@ public final class RequestConverter { * @return a mutate request * @throws IOException */ - public static ClientProtos.MultiRequest buildMutateRequest( - final byte[] regionName, final byte[] row, final byte[] family, - final byte [] qualifier, final ByteArrayComparable comparator, - final CompareType compareType, final RowMutations rowMutations) throws IOException { + public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName, + final byte[] row, final byte[] family, final byte[] qualifier, + final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange, + final RowMutations rowMutations) throws IOException { RegionAction.Builder builder = getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName); builder.setAtomic(true); ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder(); MutationProto.Builder mutationBuilder = MutationProto.newBuilder(); - Condition condition = buildCondition( - row, family, qualifier, comparator, compareType); for (Mutation mutation: rowMutations.getMutations()) { MutationType mutateType = null; if (mutation instanceof Put) { @@ -316,10 +310,9 @@ public final class RequestConverter { actionBuilder.setMutation(mp); builder.addAction(actionBuilder.build()); } - ClientProtos.MultiRequest request = - ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) - .setCondition(condition).build(); - return request; + return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build()) + .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange)) + .build(); } /** @@ -1100,16 +1093,16 @@ public final class RequestConverter { * @throws IOException */ public static Condition buildCondition(final byte[] row, final byte[] family, - final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType) - throws IOException { - Condition.Builder builder = Condition.newBuilder(); - builder.setRow(UnsafeByteOperations.unsafeWrap(row)); - builder.setFamily(UnsafeByteOperations.unsafeWrap(family)); - builder.setQualifier(UnsafeByteOperations - .unsafeWrap(qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier)); - builder.setComparator(ProtobufUtil.toComparator(comparator)); - builder.setCompareType(compareType); - return builder.build(); + final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType, + final TimeRange timeRange) { + return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row)) + .setFamily(UnsafeByteOperations.unsafeWrap(family)) + .setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ? + HConstants.EMPTY_BYTE_ARRAY : qualifier)) + .setComparator(ProtobufUtil.toComparator(comparator)) + .setCompareType(compareType) + .setTimeRange(ProtobufUtil.toTimeRange(timeRange)) + .build(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java index 77c0650..f16f060 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.ClassRule; @@ -110,7 +111,7 @@ public class TestProtobufUtil { getBuilder = ClientProtos.Get.newBuilder(proto); getBuilder.setMaxVersions(1); getBuilder.setCacheBlocks(true); - + getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); Get get = ProtobufUtil.toGet(proto); assertEquals(getBuilder.build(), ProtobufUtil.toGet(get)); } @@ -244,6 +245,7 @@ public class TestProtobufUtil { scanBuilder.setMaxVersions(2); scanBuilder.setCacheBlocks(false); scanBuilder.setCaching(1024); + scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); ClientProtos.Scan expectedProto = scanBuilder.build(); ClientProtos.Scan actualProto = ProtobufUtil.toScan( @@ -305,6 +307,7 @@ public class TestProtobufUtil { Increment increment = ProtobufUtil.toIncrement(proto, null); mutateBuilder.setTimestamp(increment.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); } @@ -345,6 +348,7 @@ public class TestProtobufUtil { // append always use the latest timestamp, // reset the timestamp to the original mutate mutateBuilder.setTimestamp(append.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java index e450346..c44ab69 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java @@ -37,6 +37,20 @@ import org.apache.yetus.audience.InterfaceAudience; public class TimeRange { public static final long INITIAL_MIN_TIMESTAMP = 0L; public static final long INITIAL_MAX_TIMESTAMP = Long.MAX_VALUE; + private static final TimeRange ALL_TIME = new TimeRange(INITIAL_MIN_TIMESTAMP, + INITIAL_MAX_TIMESTAMP); + + public static TimeRange allTime() { + return ALL_TIME; + } + + public static TimeRange at(long ts) { + if (ts < 0 || ts == Long.MAX_VALUE) { + throw new IllegalArgumentException("invalid ts:" + ts); + } + return new TimeRange(ts, ts + 1); + } + private final long minStamp; private final long maxStamp; private final boolean allTime; @@ -150,7 +164,10 @@ public class TimeRange { * @param bytes timestamp to check * @param offset offset into the bytes * @return true if within TimeRange, false if not + * @deprecated This is made @InterfaceAudience.Private in the 2.0 line and above and may be + * changed to private or removed in 3.0. Use {@link #withinTimeRange(long)} instead */ + @Deprecated public boolean withinTimeRange(byte [] bytes, int offset) { if (allTime) { return true; http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-protocol-shaded/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto index 325b9c1..14abb08 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Client.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto @@ -143,6 +143,7 @@ message Condition { required bytes qualifier = 3; required CompareType compare_type = 4; required Comparator comparator = 5; + optional TimeRange time_range = 6; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-protocol/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 9b4e3e1..5fd20c8 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -142,6 +142,7 @@ message Condition { required bytes qualifier = 3; required CompareType compare_type = 4; required Comparator comparator = 5; + optional TimeRange time_range = 6; } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 21c7858..b8d0035 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -976,6 +976,11 @@ public class RemoteHTable implements Table { } @Override + public CheckAndMutateBuilder timeRange(TimeRange timeRange) { + throw new UnsupportedOperationException("timeRange not implemented"); + } + + @Override public CheckAndMutateBuilder ifNotExists() { throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison " + "not implemented"); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index de0342e..9fd5eb8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4020,28 +4020,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOperator op, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) - throws IOException{ + public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException { checkMutationType(mutation, row); - return doCheckAndRowMutate(row, family, qualifier, op, comparator, null, - mutation); + return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation); } @Override - public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOperator op, ByteArrayComparable comparator, RowMutations rm) - throws IOException { - return doCheckAndRowMutate(row, family, qualifier, op, comparator, rm, null); + public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException { + return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null); } /** * checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has * switches in the few places where there is deviation. */ - private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier, - CompareOperator op, ByteArrayComparable comparator, RowMutations rowMutations, - Mutation mutation) + private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange, + RowMutations rowMutations, Mutation mutation) throws IOException { // Could do the below checks but seems wacky with two callers only. Just comment out for now. // One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't @@ -4056,6 +4053,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Get get = new Get(row); checkFamily(family); get.addColumn(family, qualifier); + if (timeRange != null) { + get.setTimeRange(timeRange.getMin(), timeRange.getMax()); + } // Lock row - note that doBatchMutate will relock this row if called checkRow(row, "doCheckAndRowMutate"); RowLock rowLock = getRowLockInternal(get.getRow(), false, null); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f9d6798..348c9b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PriorityFunction; @@ -593,9 +594,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @param cellScanner if non-null, the mutation data -- the Cell content. */ private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions, - final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, - CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder, - ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { + final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder, + ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException { int countOfCompleteMutation = 0; try { if (!region.getRegionInfo().isMetaRegion()) { @@ -638,7 +639,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, builder.addResultOrException( resultOrExceptionOrBuilder.build()); } - return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm); + return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm); } finally { // Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner // even if the malformed cells are not skipped. @@ -2655,9 +2656,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler, CompareOperator.valueOf(condition.getCompareType().name()); ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); - processed = checkAndRowMutate(region, regionAction.getActionList(), - cellScanner, row, family, qualifier, op, - comparator, regionActionResultBuilder, spaceQuotaEnforcement); + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); + processed = + checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family, + qualifier, op, comparator, timeRange, regionActionResultBuilder, + spaceQuotaEnforcement); } else { doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(), cellScanner, spaceQuotaEnforcement); @@ -2778,79 +2783,84 @@ public class RSRpcServices implements HBaseRPCErrorHandler, spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); switch (type) { - case APPEND: - // TODO: this doesn't actually check anything. - r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); - break; - case INCREMENT: - // TODO: this doesn't actually check anything. - r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); - break; - case PUT: - Put put = ProtobufUtil.toPut(mutation, cellScanner); - checkCellSizeLimit(region, put); - // Throws an exception when violated - spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); - quota.addMutation(put); - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator compareOp = - CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); - if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, - compareOp, comparator, put); - } - if (processed == null) { - boolean result = region.checkAndMutate(row, family, - qualifier, compareOp, comparator, put, true); + case APPEND: + // TODO: this doesn't actually check anything. + r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + break; + case INCREMENT: + // TODO: this doesn't actually check anything. + r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement); + break; + case PUT: + Put put = ProtobufUtil.toPut(mutation, cellScanner); + checkCellSizeLimit(region, put); + // Throws an exception when violated + spaceQuotaEnforcement.getPolicyEnforcement(region).check(put); + quota.addMutation(put); + if (request.hasCondition()) { + Condition condition = request.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.getFamily().toByteArray(); + byte[] qualifier = condition.getQualifier().toByteArray(); + CompareOperator compareOp = + CompareOperator.valueOf(condition.getCompareType().name()); + ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); if (region.getCoprocessorHost() != null) { - result = region.getCoprocessorHost().postCheckAndPut(row, family, - qualifier, compareOp, comparator, put, result); + processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier, + compareOp, comparator, put); } - processed = result; - } - } else { - region.put(put); - processed = Boolean.TRUE; - } - break; - case DELETE: - Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); - checkCellSizeLimit(region, delete); - spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); - quota.addMutation(delete); - if (request.hasCondition()) { - Condition condition = request.getCondition(); - byte[] row = condition.getRow().toByteArray(); - byte[] family = condition.getFamily().toByteArray(); - byte[] qualifier = condition.getQualifier().toByteArray(); - CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); - ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); - if (region.getCoprocessorHost() != null) { - processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, - comparator, delete); + if (processed == null) { + boolean result = region.checkAndMutate(row, family, + qualifier, compareOp, comparator, timeRange, put); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndPut(row, family, + qualifier, compareOp, comparator, put, result); + } + processed = result; + } + } else { + region.put(put); + processed = Boolean.TRUE; } - if (processed == null) { - boolean result = region.checkAndMutate(row, family, - qualifier, op, comparator, delete, true); + break; + case DELETE: + Delete delete = ProtobufUtil.toDelete(mutation, cellScanner); + checkCellSizeLimit(region, delete); + spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete); + quota.addMutation(delete); + if (request.hasCondition()) { + Condition condition = request.getCondition(); + byte[] row = condition.getRow().toByteArray(); + byte[] family = condition.getFamily().toByteArray(); + byte[] qualifier = condition.getQualifier().toByteArray(); + CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name()); + ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator()); + TimeRange timeRange = condition.hasTimeRange() ? + ProtobufUtil.toTimeRange(condition.getTimeRange()) : + TimeRange.allTime(); if (region.getCoprocessorHost() != null) { - result = region.getCoprocessorHost().postCheckAndDelete(row, family, - qualifier, op, comparator, delete, result); + processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op, + comparator, delete); } - processed = result; + if (processed == null) { + boolean result = region.checkAndMutate(row, family, + qualifier, op, comparator, timeRange, delete); + if (region.getCoprocessorHost() != null) { + result = region.getCoprocessorHost().postCheckAndDelete(row, family, + qualifier, op, comparator, delete, result); + } + processed = result; + } + } else { + region.delete(delete); + processed = Boolean.TRUE; } - } else { - region.delete(delete); - processed = Boolean.TRUE; - } - break; - default: - throw new DoNotRetryIOException( - "Unsupported mutate type: " + type.name()); + break; + default: + throw new DoNotRetryIOException("Unsupported mutate type: " + type.name()); } if (processed != null) { builder.setProcessed(processed.booleanValue()); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 27771ce..80b18b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -303,14 +304,31 @@ public interface Region extends ConfigurationObserver { * @param family column family to check * @param qualifier column qualifier to check * @param op the comparison operator - * @param comparator - * @param mutation - * @param writeToWAL + * @param comparator the expected value + * @param mutation data to put if check succeeds + * @return true if mutation was applied, false otherwise + */ + default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, + ByteArrayComparable comparator, Mutation mutation) throws IOException { + return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value and if it does, + * it performs the mutation. If the passed value is null, the lack of column value + * (ie: non-existence) is used. See checkAndRowMutate to do many checkAndPuts at a time on a + * single row. + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param op the comparison operator + * @param comparator the expected value + * @param mutation data to put if check succeeds + * @param timeRange time range to check * @return true if mutation was applied, false otherwise - * @throws IOException */ boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, - ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException; + ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException; /** * Atomically checks if a row/family/qualifier value matches the expected values and if it does, @@ -321,13 +339,32 @@ public interface Region extends ConfigurationObserver { * @param family column family to check * @param qualifier column qualifier to check * @param op the comparison operator - * @param comparator - * @param mutations + * @param comparator the expected value + * @param mutations data to put if check succeeds + * @return true if mutations were applied, false otherwise + */ + default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, + ByteArrayComparable comparator, RowMutations mutations) throws IOException { + return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), + mutations); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected values and if it does, + * it performs the row mutations. If the passed value is null, the lack of column value + * (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate + * to do one checkAndMutate at a time. + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param op the comparison operator + * @param comparator the expected value + * @param mutations data to put if check succeeds + * @param timeRange time range to check * @return true if mutations were applied, false otherwise - * @throws IOException */ boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op, - ByteArrayComparable comparator, RowMutations mutations) + ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java index 37182ec..576c0a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java @@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -338,4 +339,66 @@ public class TestAsyncTable { } }); } + + @Test + public void testCheckAndMutateWithTimeRange() throws Exception { + TEST_UTIL.createTable(TableName.valueOf("testCheckAndMutateWithTimeRange"), FAMILY); + AsyncTable<?> table = getTable.get(); + final long ts = System.currentTimeMillis() / 2; + Put put = new Put(row); + put.addColumn(FAMILY, QUALIFIER, ts, VALUE); + + boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .ifNotExists() + .thenPut(put) + .get(); + assertTrue(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenPut(put) + .get(); + assertFalse(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenPut(put) + .get(); + assertTrue(ok); + + RowMutations rm = new RowMutations(row) + .add((Mutation) put); + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenMutate(rm) + .get(); + assertFalse(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenMutate(rm) + .get(); + assertTrue(ok); + + Delete delete = new Delete(row) + .addColumn(FAMILY, QUALIFIER); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenDelete(delete) + .get(); + assertFalse(ok); + + ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenDelete(delete) + .get(); + assertTrue(ok); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index 29d3439..5fba101 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.filter.ValueFilter; import org.apache.hadoop.hbase.filter.WhileMatchFilter; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; @@ -4832,6 +4833,60 @@ public class TestFromClientSide { } @Test + public void testCheckAndMutateWithTimeRange() throws IOException { + Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY); + final long ts = System.currentTimeMillis() / 2; + Put put = new Put(ROW); + put.addColumn(FAMILY, QUALIFIER, ts, VALUE); + + boolean ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .ifNotExists() + .thenPut(put); + assertTrue(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenPut(put); + assertFalse(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenPut(put); + assertTrue(ok); + + RowMutations rm = new RowMutations(ROW) + .add((Mutation) put); + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenMutate(rm); + assertFalse(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenMutate(rm); + assertTrue(ok); + + Delete delete = new Delete(ROW) + .addColumn(FAMILY, QUALIFIER); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts + 10000)) + .ifEquals(VALUE) + .thenDelete(delete); + assertFalse(ok); + + ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER) + .timeRange(TimeRange.at(ts)) + .ifEquals(VALUE) + .thenDelete(delete); + assertTrue(ok); + } + + @Test public void testCheckAndPutWithCompareOp() throws IOException { final byte [] value1 = Bytes.toBytes("aaaa"); final byte [] value2 = Bytes.toBytes("bbbb"); http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java index 6305fa1..ef4ca25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java @@ -239,7 +239,7 @@ public class TestMalformedCellFromClient { ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder(); ClientProtos.Condition condition = RequestConverter .buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]), - HBaseProtos.CompareType.EQUAL); + HBaseProtos.CompareType.EQUAL, null); for (Mutation mutation : rm.getMutations()) { ClientProtos.MutationProto.MutationType mutateType = null; if (mutation instanceof Put) { http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java index 536af71..7f45e40 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column; @@ -104,6 +105,7 @@ public class TestProtobufUtil { getBuilder = ClientProtos.Get.newBuilder(proto); getBuilder.setMaxVersions(1); getBuilder.setCacheBlocks(true); + getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); Get get = ProtobufUtil.toGet(proto); assertEquals(getBuilder.build(), ProtobufUtil.toGet(get)); @@ -146,6 +148,7 @@ public class TestProtobufUtil { // append always use the latest timestamp, // reset the timestamp to the original mutate mutateBuilder.setTimestamp(append.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append)); } @@ -229,6 +232,7 @@ public class TestProtobufUtil { Increment increment = ProtobufUtil.toIncrement(proto, null); mutateBuilder.setTimestamp(increment.getTimeStamp()); + mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange())); assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment)); } @@ -314,6 +318,7 @@ public class TestProtobufUtil { scanBuilder.setMaxVersions(2); scanBuilder.setCacheBlocks(false); scanBuilder.setCaching(1024); + scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime())); ClientProtos.Scan expectedProto = scanBuilder.build(); ClientProtos.Scan actualProto = ProtobufUtil.toScan( http://git-wip-us.apache.org/repos/asf/hbase/blob/ad47c2da/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java index b14c94f..3962bbe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java @@ -663,7 +663,7 @@ public class TestAtomicOperation { } testStep = TestStep.CHECKANDPUT_STARTED; region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"), - CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true); + CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put); testStep = TestStep.CHECKANDPUT_COMPLETED; } }