http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java index 6a02e18..b97743f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java @@ -21,20 +21,22 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; - -import com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.util.Bytes; /** - * Implementations make an rpc call against a RegionService via a protobuf Service. - * Implement rpcCall(). Be sure to make use of the RpcController that this instance is carrying - * via {@link #getRpcController()}. + * Implementations make a RPC call against a RegionService via a protobuf Service. + * Implement rpcCall() and the parent class setClientByServiceName; this latter is where the + * RPC stub gets set (the appropriate protobuf 'Service'/Client). Be sure to make use of the + * RpcController that this instance is carrying via #getRpcController(). * * <p>TODO: this class is actually tied to one region, because most of the paths make use of * the regioninfo part of location when building requests. The only reason it works for @@ -42,74 +44,75 @@ import com.google.protobuf.RpcController; * This could be done cleaner (e.g. having a generic parameter and 2 derived classes, * RegionCallable and actual RegionServerCallable with ServerName. * - * @param <T> the class that the ServerCallable handles + * @param <T> The class that the ServerCallable handles. + * @param <S> The protocol to use (Admin or Client or even an Endpoint over in MetaTableAccessor). */ +// TODO: MasterCallable and this Class have a lot in common. UNIFY! +// Public but should be package private only it is used by MetaTableAccessor. FIX!! @InterfaceAudience.Private -public abstract class RegionServerCallable<T> extends AbstractRegionServerCallable<T> { - private ClientService.BlockingInterface stub; - - /* This is 99% of the time a PayloadCarryingRpcController but this RegionServerCallable is - * also used doing Coprocessor Endpoints and in this case, it is a ServerRpcControllable which is - * not a PayloadCarryingRpcController. Too hard to untangle it all at this stage since - * downstreamers are using RegionServerCallable invoking CPEPs so just do ugly instanceof - * checks in the below. +public abstract class RegionServerCallable<T, S> implements RetryingCallable<T> { + private final Connection connection; + private final TableName tableName; + private final byte[] row; + /** + * Some subclasses want to set their own location. Make it protected. */ - private final RpcController rpcController; + protected HRegionLocation location; + protected final static int MIN_WAIT_DEAD_SERVER = 10000; + protected S stub; + + /** + * This is 99% of the time a HBaseRpcController but also used doing Coprocessor Endpoints and in + * this case, it is a ServerRpcControllable which is not a HBaseRpcController. + * Can be null! + */ + protected final RpcController rpcController; /** * @param connection Connection to use. + * @param rpcController Controller to use; can be shaded or non-shaded. * @param tableName Table name to which <code>row</code> belongs. * @param row The row we want in <code>tableName</code>. */ - public RegionServerCallable(Connection connection, RpcControllerFactory rpcControllerFactory, - TableName tableName, byte [] row) { - this(connection, rpcControllerFactory.newController(), tableName, row); - } - - public RegionServerCallable(Connection connection, RpcController rpcController, - TableName tableName, byte [] row) { - super(connection, tableName, row); + public RegionServerCallable(Connection connection, TableName tableName, byte [] row, + RpcController rpcController) { + super(); + this.connection = connection; + this.tableName = tableName; + this.row = row; this.rpcController = rpcController; } - void setClientByServiceName(ServerName service) throws IOException { - this.setStub(getConnection().getClient(service)); + protected RpcController getRpcController() { + return this.rpcController; } - /** - * @return Client Rpc protobuf communication stub - */ - protected ClientService.BlockingInterface getStub() { - return this.stub; + protected void setStub(S stub) { + this.stub = stub; } - /** - * Set the client protobuf communication stub - * @param stub to set - */ - void setStub(final ClientService.BlockingInterface stub) { - this.stub = stub; + protected S getStub() { + return this.stub; } /** - * Override that changes call Exception from {@link Exception} to {@link IOException}. It also - * does setup of an rpcController and calls through to the unimplemented - * rpcCall() method. If rpcController is an instance of PayloadCarryingRpcController, - * we will set a timeout on it. + * Override that changes call Exception from {@link Exception} to {@link IOException}. + * Also does set up of the rpcController. */ - @Override public T call(int callTimeout) throws IOException { try { - if (this.rpcController != null) { + // Iff non-null and an instance of a SHADED rpcController, do config! Unshaded -- i.e. + // com.google.protobuf.RpcController or null -- will just skip over this config. + if (getRpcController() != null) { + RpcController shadedRpcController = (RpcController)getRpcController(); // Do a reset to clear previous states, such as CellScanner. - this.rpcController.reset(); - if (this.rpcController instanceof HBaseRpcController) { - HBaseRpcController pcrc = (HBaseRpcController)this.rpcController; - // If it is an instance of PayloadCarryingRpcController, we can set priority on the - // controller based off the tableName. RpcController may be null in tests when mocking so allow - // for null controller. - pcrc.setPriority(tableName); - pcrc.setCallTimeout(callTimeout); + shadedRpcController.reset(); + if (shadedRpcController instanceof HBaseRpcController) { + HBaseRpcController hrc = (HBaseRpcController)getRpcController(); + // If it is an instance of HBaseRpcController, we can set priority on the controller based + // off the tableName. Set call timeout too. + hrc.setPriority(tableName); + hrc.setCallTimeout(callTimeout); } } return rpcCall(); @@ -128,23 +131,98 @@ public abstract class RegionServerCallable<T> extends AbstractRegionServerCallab */ protected abstract T rpcCall() throws Exception; - protected RpcController getRpcController() { - return this.rpcController; - } - /** * Get the RpcController CellScanner. - * If the RpcController is a PayloadCarryingRpcController, which it is in all cases except + * If the RpcController is a HBaseRpcController, which it is in all cases except * when we are processing Coprocessor Endpoint, then this method returns a reference to the - * CellScanner that the PayloadCarryingRpcController is carrying. Do it up here in this Callable - * so we don't have to scatter ugly instanceof tests around the codebase. Will fail if called in - * a Coproccessor Endpoint context. Should never happen. + * CellScanner that the HBaseRpcController is carrying. Do it up here in this Callable + * so we don't have to scatter ugly instanceof tests around the codebase. Will return null + * if called in a Coproccessor Endpoint context. Should never happen. */ protected CellScanner getRpcControllerCellScanner() { - return ((HBaseRpcController)this.rpcController).cellScanner(); + return (getRpcController() != null && getRpcController() instanceof HBaseRpcController)? + ((HBaseRpcController)getRpcController()).cellScanner(): null; } protected void setRpcControllerCellScanner(CellScanner cellScanner) { - ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner); + if (getRpcController() != null && getRpcController() instanceof HBaseRpcController) { + ((HBaseRpcController)this.rpcController).setCellScanner(cellScanner); + } + } + + /** + * @return {@link ClusterConnection} instance used by this Callable. + */ + protected ClusterConnection getConnection() { + return (ClusterConnection) this.connection; + } + + protected HRegionLocation getLocation() { + return this.location; } + + protected void setLocation(final HRegionLocation location) { + this.location = location; + } + + public TableName getTableName() { + return this.tableName; + } + + public byte [] getRow() { + return this.row; + } + + public void throwable(Throwable t, boolean retrying) { + if (location != null) { + getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(), + row, t, location.getServerName()); + } + } + + public String getExceptionMessageAdditionalDetail() { + return "row '" + Bytes.toString(row) + "' on table '" + tableName + "' at " + location; + } + + public long sleep(long pause, int tries) { + long sleep = ConnectionUtils.getPauseTime(pause, tries); + if (sleep < MIN_WAIT_DEAD_SERVER + && (location == null || getConnection().isDeadServer(location.getServerName()))) { + sleep = ConnectionUtils.addJitter(MIN_WAIT_DEAD_SERVER, 0.10f); + } + return sleep; + } + + /** + * @return the HRegionInfo for the current region + */ + public HRegionInfo getHRegionInfo() { + if (this.location == null) { + return null; + } + return this.location.getRegionInfo(); + } + + public void prepare(final boolean reload) throws IOException { + // check table state if this is a retry + if (reload && !tableName.equals(TableName.META_TABLE_NAME) && + getConnection().isTableDisabled(tableName)) { + throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled."); + } + try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) { + this.location = regionLocator.getRegionLocation(row); + } + if (this.location == null) { + throw new IOException("Failed to find location, tableName=" + tableName + + ", row=" + Bytes.toString(row) + ", reload=" + reload); + } + setStubByServiceName(this.location.getServerName()); + } + + /** + * Set the RCP client stub + * @param serviceName to get the rpc stub for + * @throws IOException When client could not be created + */ + protected abstract void setStubByServiceName(ServerName serviceName) throws IOException; } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java index 98792e7..9220f12 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Bytes; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java index afbcc9a..029ee9b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallable.java @@ -19,15 +19,49 @@ package org.apache.hadoop.hbase.client; +import java.io.IOException; + import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * A Callable<T> that will be retried. If {@link #call(int)} invocation throws exceptions, + * A Callable<T> that will be retried. If {@link #call(int)} invocation throws exceptions, * we will call {@link #throwable(Throwable, boolean)} with whatever the exception was. * @param <T> result class from executing <tt>this</tt> */ @InterfaceAudience.Private -public interface RetryingCallable<T> extends RetryingCallableBase { +public interface RetryingCallable<T> { + /** + * Prepare by setting up any connections to servers, etc., ahead of call invocation. + * TODO: We call prepare before EVERY call. Seems wrong. FIX!!!! + * @param reload Set this to true if need to requery locations + * @throws IOException e + */ + void prepare(final boolean reload) throws IOException; + + /** + * Called when call throws an exception and we are going to retry; take action to + * make it so we succeed on next call (clear caches, do relookup of locations, etc.). + * @param t throwable which was thrown + * @param retrying True if we are in retrying mode (we are not in retrying mode when max + * retries == 1; we ARE in retrying mode if retries > 1 even when we are the + * last attempt) + */ + void throwable(final Throwable t, boolean retrying); + + /** + * @return Some details from the implementation that we would like to add to a terminating + * exception; i.e. a fatal exception is being thrown ending retries and we might like to + * add more implementation-specific detail on to the exception being thrown. + */ + String getExceptionMessageAdditionalDetail(); + + /** + * @param pause time to pause + * @param tries amount of tries until till sleep + * @return Suggestion on how much to sleep between retries + */ + long sleep(final long pause, final int tries); + /** * Computes a result, or throws an exception if unable to do so. * http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java deleted file mode 100644 index 483f6c2..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallableBase.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; - -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * All generic methods for a Callable that can be retried. It is extended with Sync and - * Async versions. - */ -@InterfaceAudience.Private -public interface RetryingCallableBase { - /** - * Prepare by setting up any connections to servers, etc., ahead of call invocation. - * @param reload Set this to true if need to requery locations - * @throws IOException e - */ - void prepare(final boolean reload) throws IOException; - - /** - * Called when call throws an exception and we are going to retry; take action to - * make it so we succeed on next call (clear caches, do relookup of locations, etc.). - * @param t throwable which was thrown - * @param retrying True if we are in retrying mode (we are not in retrying mode when max - * retries == 1; we ARE in retrying mode if retries > 1 even when we are the - * last attempt) - */ - void throwable(final Throwable t, boolean retrying); - - /** - * @return Some details from the implementation that we would like to add to a terminating - * exception; i.e. a fatal exception is being thrown ending retries and we might like to - * add more implementation-specific detail on to the exception being thrown. - */ - String getExceptionMessageAdditionalDetail(); - - /** - * @param pause time to pause - * @param tries amount of tries until till sleep - * @return Suggestion on how much to sleep between retries - */ - long sleep(final long pause, final int tries); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java index b0ba9f5..985d938 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java @@ -49,8 +49,7 @@ abstract class RetryingCallerInterceptorContext { * @return A new {@link RetryingCallerInterceptorContext} object that can be * used for use in the current retrying call */ - public abstract RetryingCallerInterceptorContext prepare( - RetryingCallableBase callable); + public abstract RetryingCallerInterceptorContext prepare(RetryingCallable<?> callable); /** * Telescopic extension that takes which of the many retries we are currently @@ -64,6 +63,5 @@ abstract class RetryingCallerInterceptorContext { * @return A new context object that can be used for use in the current * retrying call */ - public abstract RetryingCallerInterceptorContext prepare( - RetryingCallableBase callable, int tries); -} + public abstract RetryingCallerInterceptorContext prepare(RetryingCallable<?> callable, int tries); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index a5bebd0..c7d78c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -103,23 +103,23 @@ public class ReversedScannerCallable extends ScannerCallable { if (locateStartRow == null) { // Just locate the region with the row RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id, - getConnection(), tableName, row); + getConnection(), getTableName(), getRow()); this.location = id < rl.size() ? rl.getRegionLocation(id) : null; if (this.location == null) { throw new IOException("Failed to find location, tableName=" - + tableName + ", row=" + Bytes.toStringBinary(row) + ", reload=" + + getTableName() + ", row=" + Bytes.toStringBinary(getRow()) + ", reload=" + reload); } } else { // Need to locate the regions with the range, and the target location is // the last one which is the previous region of last region scanner List<HRegionLocation> locatedRegions = locateRegionsInRange( - locateStartRow, row, reload); + locateStartRow, getRow(), reload); if (locatedRegions.isEmpty()) { throw new DoNotRetryIOException( "Does hbase:meta exist hole? Couldn't get regions for the range from " + Bytes.toStringBinary(locateStartRow) + " to " - + Bytes.toStringBinary(row)); + + Bytes.toStringBinary(getRow())); } this.location = locatedRegions.get(locatedRegions.size() - 1); } @@ -159,7 +159,7 @@ public class ReversedScannerCallable extends ScannerCallable { byte[] currentKey = startKey; do { RegionLocations rl = RpcRetryingCallerWithReadReplicas.getRegionLocations(reload, id, - getConnection(), tableName, currentKey); + getConnection(), getTableName(), currentKey); HRegionLocation regionLocation = id < rl.size() ? rl.getRegionLocation(id) : null; if (regionLocation != null && regionLocation.getRegionInfo().containsRow(currentKey)) { regionList.add(regionLocation); @@ -176,7 +176,7 @@ public class ReversedScannerCallable extends ScannerCallable { @Override public ScannerCallable getScannerCallableForReplica(int id) { - ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), this.tableName, + ReversedScannerCallable r = new ReversedScannerCallable(getConnection(), getTableName(), this.getScan(), this.scanMetrics, this.locateStartRow, rpcControllerFactory, id); r.setCaching(this.getCaching()); return r; http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java index 68a4aa2..75fec63 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallable.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.client; import java.io.Closeable; import java.io.IOException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** * A RetryingCallable for RPC connection operations. http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java index cc2f159..e940143 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerImpl.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ExceptionUtil; import org.apache.hadoop.ipc.RemoteException; -import com.google.protobuf.ServiceException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; /** * Runs an rpc'ing {@link RetryingCallable}. Sets into rpc client http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 3d55136..04553d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -20,31 +20,27 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -90,21 +86,14 @@ public class RpcRetryingCallerWithReadReplicas { * - we need to stop retrying when the call is completed * - we can be interrupted */ - class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable { + class ReplicaRegionServerCallable extends CancellableRegionServerCallable<Result> { final int id; - private final HBaseRpcController controller; - public ReplicaRegionServerCallable(int id, HRegionLocation location) { - super(RpcRetryingCallerWithReadReplicas.this.cConnection, rpcControllerFactory, - RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow()); + super(RpcRetryingCallerWithReadReplicas.this.cConnection, + RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), + rpcControllerFactory.newController()); this.id = id; this.location = location; - this.controller = rpcControllerFactory.newController(); - } - - @Override - public void cancel() { - controller.startCancel(); } /** @@ -113,13 +102,12 @@ public class RpcRetryingCallerWithReadReplicas { * - set the location to the right region, depending on the replica. */ @Override + // TODO: Very like the super class implemenation. Can we shrink this down? public void prepare(final boolean reload) throws IOException { - if (controller.isCanceled()) return; - + if (getRpcController().isCanceled()) return; if (Thread.interrupted()) { throw new InterruptedIOException(); } - if (reload || location == null) { RegionLocations rl = getRegionLocations(false, id, cConnection, tableName, get.getRow()); location = id < rl.size() ? rl.getRegionLocation(id) : null; @@ -131,35 +119,27 @@ public class RpcRetryingCallerWithReadReplicas { throw new HBaseIOException("There is no location for replica id #" + id); } - ServerName dest = location.getServerName(); - - setStub(cConnection.getClient(dest)); + setStubByServiceName(this.location.getServerName()); } - private void initRpcController() { - controller.reset(); - controller.setCallTimeout(callTimeout); - controller.setPriority(tableName); - } @Override + // TODO: Very like the super class implemenation. Can we shrink this down? protected Result rpcCall() throws Exception { - if (controller.isCanceled()) return null; + if (getRpcController().isCanceled()) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); } byte[] reg = location.getRegionInfo().getRegionName(); ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); - initRpcController(); - ClientProtos.GetResponse response = getStub().get(controller, request); + HBaseRpcController hrc = (HBaseRpcController)getRpcController(); + hrc.reset(); + hrc.setCallTimeout(callTimeout); + hrc.setPriority(tableName); + ClientProtos.GetResponse response = getStub().get(hrc, request); if (response == null) { return null; } - return ProtobufUtil.toResult(response.getResult(), controller.cellScanner()); - } - - @Override - public boolean isCancelled() { - return controller.isCanceled(); + return ProtobufUtil.toResult(response.getResult(), hrc.cellScanner()); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 22f611a..71a31db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -38,9 +38,9 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.IncompatibleFilterException; import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.security.access.Permission; import org.apache.hadoop.hbase.security.visibility.Authorizations; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.util.Bytes; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 8345aa1..0351e54 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -42,11 +42,11 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; @@ -57,7 +57,7 @@ import org.apache.hadoop.net.DNS; * {@link RpcRetryingCaller} so fails are retried. */ @InterfaceAudience.Private -public class ScannerCallable extends RegionServerCallable<Result[]> { +public class ScannerCallable extends ClientServiceCallable<Result[]> { public static final String LOG_SCANNER_LATENCY_CUTOFF = "hbase.client.log.scanner.latency.cutoff"; public static final String LOG_SCANNER_ACTIVITY = "hbase.client.log.scanner.activity"; @@ -119,7 +119,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) { - super(connection, rpcControllerFactory, tableName, scan.getStartRow()); + super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController()); this.id = id; this.scan = scan; this.scanMetrics = scanMetrics; @@ -429,7 +429,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { } public ScannerCallable getScannerCallableForReplica(int id) { - ScannerCallable s = new ScannerCallable(this.getConnection(), this.tableName, + ScannerCallable s = new ScannerCallable(this.getConnection(), getTableName(), this.getScan(), this.scanMetrics, this.rpcControllerFactory, id); s.setCaching(this.caching); return s; @@ -460,4 +460,4 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { this.serverHasMoreResultsContext = serverHasMoreResultsContext; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java index 5af8034..c8d9738 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -26,17 +26,17 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.security.token.Token; /** @@ -54,8 +54,9 @@ public class SecureBulkLoadClient { public String prepareBulkLoad(final Connection conn) throws IOException { try { - RegionServerCallable<String> callable = new RegionServerCallable<String>(conn, - this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) { + ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn, + table.getName(), HConstants.EMPTY_START_ROW, + this.rpcControllerFactory.newController()) { @Override protected String rpcCall() throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); @@ -77,8 +78,8 @@ public class SecureBulkLoadClient { public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { try { - RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, - this.rpcControllerFactory, table.getName(), HConstants.EMPTY_START_ROW) { + ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn, + table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) { @Override protected Void rpcCall() throws Exception { byte[] regionName = getLocation().getRegionInfo().getRegionName(); @@ -145,4 +146,4 @@ public class SecureBulkLoadClient { throw ProtobufUtil.handleRemoteException(se); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java new file mode 100644 index 0000000..fa4e5f1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SyncCoprocessorRpcChannel.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.Descriptors; +import com.google.protobuf.Message; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; + +/** + * Base class which provides clients with an RPC connection to + * call coprocessor endpoint {@link com.google.protobuf.Service}s. + * Note that clients should not use this class directly, except through + * {@link org.apache.hadoop.hbase.client.Table#coprocessorService(byte[])}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +abstract class SyncCoprocessorRpcChannel implements CoprocessorRpcChannel { + private static final Log LOG = LogFactory.getLog(SyncCoprocessorRpcChannel.class); + + @Override + @InterfaceAudience.Private + public void callMethod(Descriptors.MethodDescriptor method, + RpcController controller, + Message request, Message responsePrototype, + RpcCallback<Message> callback) { + Message response = null; + try { + response = callExecService(controller, method, request, responsePrototype); + } catch (IOException ioe) { + LOG.warn("Call failed on IOException", ioe); + CoprocessorRpcUtils.setControllerException(controller, ioe); + } + if (callback != null) { + callback.run(response); + } + } + + @Override + @InterfaceAudience.Private + public Message callBlockingMethod(Descriptors.MethodDescriptor method, + RpcController controller, + Message request, Message responsePrototype) + throws ServiceException { + try { + return callExecService(controller, method, request, responsePrototype); + } catch (IOException ioe) { + throw new ServiceException("Error calling method "+method.getFullName(), ioe); + } + } + + protected abstract Message callExecService(RpcController controller, + Descriptors.MethodDescriptor method, Message request, Message responsePrototype) + throws IOException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java index 5d4ac8e..71875a5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableState.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hbase.client; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** * Represents table state. http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java deleted file mode 100644 index 594a459..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AggregationClient.java +++ /dev/null @@ -1,823 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client.coprocessor; - -import java.io.Closeable; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest; -import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; -import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Pair; - -import com.google.protobuf.ByteString; -import com.google.protobuf.Message; - -/** - * This client class is for invoking the aggregate functions deployed on the - * Region Server side via the AggregateService. This class will implement the - * supporting functionality for summing/processing the individual results - * obtained from the AggregateService for each region. - * <p> - * This will serve as the client side handler for invoking the aggregate - * functions. - * For all aggregate functions, - * <ul> - * <li>start row < end row is an essential condition (if they are not - * {@link HConstants#EMPTY_BYTE_ARRAY}) - * <li>Column family can't be null. In case where multiple families are - * provided, an IOException will be thrown. An optional column qualifier can - * also be defined.</li> - * <li>For methods to find maximum, minimum, sum, rowcount, it returns the - * parameter type. For average and std, it returns a double value. For row - * count, it returns a long value.</li> - * </ul> - * <p>Call {@link #close()} when done. - */ -@InterfaceAudience.Private -public class AggregationClient implements Closeable { - // TODO: This class is not used. Move to examples? - private static final Log log = LogFactory.getLog(AggregationClient.class); - private final Connection connection; - - /** - * Constructor with Conf object - * @param cfg - */ - public AggregationClient(Configuration cfg) { - try { - // Create a connection on construction. Will use it making each of the calls below. - this.connection = ConnectionFactory.createConnection(cfg); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - if (this.connection != null && !this.connection.isClosed()) { - this.connection.close(); - } - } - - /** - * It gives the maximum value of a column for a given column family for the - * given range. In case qualifier is null, a max of all values for the given - * family is returned. - * @param tableName - * @param ci - * @param scan - * @return max val <R> - * @throws Throwable - * The caller is supposed to handle the exception as they are thrown - * & propagated to it. - */ - public <R, S, P extends Message, Q extends Message, T extends Message> R max( - final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) - throws Throwable { - try (Table table = connection.getTable(tableName)) { - return max(table, ci, scan); - } - } - - /** - * It gives the maximum value of a column for a given column family for the - * given range. In case qualifier is null, a max of all values for the given - * family is returned. - * @param table - * @param ci - * @param scan - * @return max val <> - * @throws Throwable - * The caller is supposed to handle the exception as they are thrown - * & propagated to it. - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - R max(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, - final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); - class MaxCallBack implements Batch.Callback<R> { - R max = null; - - R getMax() { - return max; - } - - @Override - public synchronized void update(byte[] region, byte[] row, R result) { - max = (max == null || (result != null && ci.compare(max, result) < 0)) ? result : max; - } - } - MaxCallBack aMaxCallBack = new MaxCallBack(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, R>() { - @Override - public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getMax(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - if (response.getFirstPartCount() > 0) { - ByteString b = response.getFirstPart(0); - Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); - return ci.getCellValueFromProto(q); - } - return null; - } - }, aMaxCallBack); - return aMaxCallBack.getMax(); - } - - /* - * @param scan - * @param canFamilyBeAbsent whether column family can be absent in familyMap of scan - */ - private void validateParameters(Scan scan, boolean canFamilyBeAbsent) throws IOException { - if (scan == null - || (Bytes.equals(scan.getStartRow(), scan.getStopRow()) && !Bytes.equals( - scan.getStartRow(), HConstants.EMPTY_START_ROW)) - || ((Bytes.compareTo(scan.getStartRow(), scan.getStopRow()) > 0) && !Bytes.equals( - scan.getStopRow(), HConstants.EMPTY_END_ROW))) { - throw new IOException("Agg client Exception: Startrow should be smaller than Stoprow"); - } else if (!canFamilyBeAbsent) { - if (scan.getFamilyMap().size() != 1) { - throw new IOException("There must be only one family."); - } - } - } - - /** - * It gives the minimum value of a column for a given column family for the - * given range. In case qualifier is null, a min of all values for the given - * family is returned. - * @param tableName - * @param ci - * @param scan - * @return min val <R> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> R min( - final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) - throws Throwable { - try (Table table = connection.getTable(tableName)) { - return min(table, ci, scan); - } - } - - /** - * It gives the minimum value of a column for a given column family for the - * given range. In case qualifier is null, a min of all values for the given - * family is returned. - * @param table - * @param ci - * @param scan - * @return min val <R> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - R min(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, - final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); - class MinCallBack implements Batch.Callback<R> { - - private R min = null; - - public R getMinimum() { - return min; - } - - @Override - public synchronized void update(byte[] region, byte[] row, R result) { - min = (min == null || (result != null && ci.compare(result, min) < 0)) ? result : min; - } - } - MinCallBack minCallBack = new MinCallBack(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, R>() { - - @Override - public R call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getMin(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - if (response.getFirstPartCount() > 0) { - ByteString b = response.getFirstPart(0); - Q q = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 3, b); - return ci.getCellValueFromProto(q); - } - return null; - } - }, minCallBack); - log.debug("Min fom all regions is: " + minCallBack.getMinimum()); - return minCallBack.getMinimum(); - } - - /** - * It gives the row count, by summing up the individual results obtained from - * regions. In case the qualifier is null, FirstKeyValueFilter is used to - * optimised the operation. In case qualifier is provided, I can't use the - * filter as it may set the flag to skip to next row, but the value read is - * not of the given filter: in this case, this particular row will not be - * counted ==> an error. - * @param tableName - * @param ci - * @param scan - * @return <R, S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> long rowCount( - final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) - throws Throwable { - try (Table table = connection.getTable(tableName)) { - return rowCount(table, ci, scan); - } - } - - /** - * It gives the row count, by summing up the individual results obtained from - * regions. In case the qualifier is null, FirstKeyValueFilter is used to - * optimised the operation. In case qualifier is provided, I can't use the - * filter as it may set the flag to skip to next row, but the value read is - * not of the given filter: in this case, this particular row will not be - * counted ==> an error. - * @param table - * @param ci - * @param scan - * @return <R, S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - long rowCount(final Table table, - final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, true); - class RowNumCallback implements Batch.Callback<Long> { - private final AtomicLong rowCountL = new AtomicLong(0); - - public long getRowNumCount() { - return rowCountL.get(); - } - - @Override - public void update(byte[] region, byte[] row, Long result) { - rowCountL.addAndGet(result.longValue()); - } - } - RowNumCallback rowNum = new RowNumCallback(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, Long>() { - @Override - public Long call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getRowNum(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - byte[] bytes = getBytesFromResponse(response.getFirstPart(0)); - ByteBuffer bb = ByteBuffer.allocate(8).put(bytes); - bb.rewind(); - return bb.getLong(); - } - }, rowNum); - return rowNum.getRowNumCount(); - } - - /** - * It sums up the value returned from various regions. In case qualifier is - * null, summation of all the column qualifiers in the given family is done. - * @param tableName - * @param ci - * @param scan - * @return sum <S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> S sum( - final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) - throws Throwable { - try (Table table = connection.getTable(tableName)) { - return sum(table, ci, scan); - } - } - - /** - * It sums up the value returned from various regions. In case qualifier is - * null, summation of all the column qualifiers in the given family is done. - * @param table - * @param ci - * @param scan - * @return sum <S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - S sum(final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, - final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); - - class SumCallBack implements Batch.Callback<S> { - S sumVal = null; - - public S getSumResult() { - return sumVal; - } - - @Override - public synchronized void update(byte[] region, byte[] row, S result) { - sumVal = ci.add(sumVal, result); - } - } - SumCallBack sumCallBack = new SumCallBack(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, S>() { - @Override - public S call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getSum(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - if (response.getFirstPartCount() == 0) { - return null; - } - ByteString b = response.getFirstPart(0); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - return s; - } - }, sumCallBack); - return sumCallBack.getSumResult(); - } - - /** - * It computes average while fetching sum and row count from all the - * corresponding regions. Approach is to compute a global sum of region level - * sum and rowcount and then compute the average. - * @param tableName - * @param scan - * @throws Throwable - */ - private <R, S, P extends Message, Q extends Message, T extends Message> Pair<S, Long> getAvgArgs( - final TableName tableName, final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) - throws Throwable { - try (Table table = connection.getTable(tableName)) { - return getAvgArgs(table, ci, scan); - } - } - - /** - * It computes average while fetching sum and row count from all the - * corresponding regions. Approach is to compute a global sum of region level - * sum and rowcount and then compute the average. - * @param table - * @param scan - * @throws Throwable - */ - private <R, S, P extends Message, Q extends Message, T extends Message> - Pair<S, Long> getAvgArgs(final Table table, - final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); - class AvgCallBack implements Batch.Callback<Pair<S, Long>> { - S sum = null; - Long rowCount = 0l; - - public synchronized Pair<S, Long> getAvgArgs() { - return new Pair<S, Long>(sum, rowCount); - } - - @Override - public synchronized void update(byte[] region, byte[] row, Pair<S, Long> result) { - sum = ci.add(sum, result.getFirst()); - rowCount += result.getSecond(); - } - } - AvgCallBack avgCallBack = new AvgCallBack(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, Pair<S, Long>>() { - @Override - public Pair<S, Long> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getAvg(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - Pair<S, Long> pair = new Pair<S, Long>(null, 0L); - if (response.getFirstPartCount() == 0) { - return pair; - } - ByteString b = response.getFirstPart(0); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - pair.setFirst(s); - ByteBuffer bb = ByteBuffer.allocate(8).put( - getBytesFromResponse(response.getSecondPart())); - bb.rewind(); - pair.setSecond(bb.getLong()); - return pair; - } - }, avgCallBack); - return avgCallBack.getAvgArgs(); - } - - /** - * This is the client side interface/handle for calling the average method for - * a given cf-cq combination. It was necessary to add one more call stack as - * its return type should be a decimal value, irrespective of what - * columninterpreter says. So, this methods collects the necessary parameters - * to compute the average and returs the double value. - * @param tableName - * @param ci - * @param scan - * @return <R, S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - double avg(final TableName tableName, - final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { - Pair<S, Long> p = getAvgArgs(tableName, ci, scan); - return ci.divideForAvg(p.getFirst(), p.getSecond()); - } - - /** - * This is the client side interface/handle for calling the average method for - * a given cf-cq combination. It was necessary to add one more call stack as - * its return type should be a decimal value, irrespective of what - * columninterpreter says. So, this methods collects the necessary parameters - * to compute the average and returs the double value. - * @param table - * @param ci - * @param scan - * @return <R, S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> double avg( - final Table table, final ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { - Pair<S, Long> p = getAvgArgs(table, ci, scan); - return ci.divideForAvg(p.getFirst(), p.getSecond()); - } - - /** - * It computes a global standard deviation for a given column and its value. - * Standard deviation is square root of (average of squares - - * average*average). From individual regions, it obtains sum, square sum and - * number of rows. With these, the above values are computed to get the global - * std. - * @param table - * @param scan - * @return standard deviations - * @throws Throwable - */ - private <R, S, P extends Message, Q extends Message, T extends Message> - Pair<List<S>, Long> getStdArgs(final Table table, - final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); - class StdCallback implements Batch.Callback<Pair<List<S>, Long>> { - long rowCountVal = 0l; - S sumVal = null, sumSqVal = null; - - public synchronized Pair<List<S>, Long> getStdParams() { - List<S> l = new ArrayList<S>(); - l.add(sumVal); - l.add(sumSqVal); - Pair<List<S>, Long> p = new Pair<List<S>, Long>(l, rowCountVal); - return p; - } - - @Override - public synchronized void update(byte[] region, byte[] row, Pair<List<S>, Long> result) { - if (result.getFirst().size() > 0) { - sumVal = ci.add(sumVal, result.getFirst().get(0)); - sumSqVal = ci.add(sumSqVal, result.getFirst().get(1)); - rowCountVal += result.getSecond(); - } - } - } - StdCallback stdCallback = new StdCallback(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, Pair<List<S>, Long>>() { - @Override - public Pair<List<S>, Long> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getStd(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - Pair<List<S>, Long> pair = new Pair<List<S>, Long>(new ArrayList<S>(), 0L); - if (response.getFirstPartCount() == 0) { - return pair; - } - List<S> list = new ArrayList<S>(); - for (int i = 0; i < response.getFirstPartCount(); i++) { - ByteString b = response.getFirstPart(i); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - list.add(s); - } - pair.setFirst(list); - ByteBuffer bb = ByteBuffer.allocate(8).put( - getBytesFromResponse(response.getSecondPart())); - bb.rewind(); - pair.setSecond(bb.getLong()); - return pair; - } - }, stdCallback); - return stdCallback.getStdParams(); - } - - /** - * This is the client side interface/handle for calling the std method for a - * given cf-cq combination. It was necessary to add one more call stack as its - * return type should be a decimal value, irrespective of what - * columninterpreter says. So, this methods collects the necessary parameters - * to compute the std and returns the double value. - * @param tableName - * @param ci - * @param scan - * @return <R, S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - double std(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, - Scan scan) throws Throwable { - try (Table table = connection.getTable(tableName)) { - return std(table, ci, scan); - } - } - - /** - * This is the client side interface/handle for calling the std method for a - * given cf-cq combination. It was necessary to add one more call stack as its - * return type should be a decimal value, irrespective of what - * columninterpreter says. So, this methods collects the necessary parameters - * to compute the std and returns the double value. - * @param table - * @param ci - * @param scan - * @return <R, S> - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> double std( - final Table table, ColumnInterpreter<R, S, P, Q, T> ci, Scan scan) throws Throwable { - Pair<List<S>, Long> p = getStdArgs(table, ci, scan); - double res = 0d; - double avg = ci.divideForAvg(p.getFirst().get(0), p.getSecond()); - double avgOfSumSq = ci.divideForAvg(p.getFirst().get(1), p.getSecond()); - res = avgOfSumSq - (avg) * (avg); // variance - res = Math.pow(res, 0.5); - return res; - } - - /** - * It helps locate the region with median for a given column whose weight - * is specified in an optional column. - * From individual regions, it obtains sum of values and sum of weights. - * @param table - * @param ci - * @param scan - * @return pair whose first element is a map between start row of the region - * and (sum of values, sum of weights) for the region, the second element is - * (sum of values, sum of weights) for all the regions chosen - * @throws Throwable - */ - private <R, S, P extends Message, Q extends Message, T extends Message> - Pair<NavigableMap<byte[], List<S>>, List<S>> - getMedianArgs(final Table table, - final ColumnInterpreter<R, S, P, Q, T> ci, final Scan scan) throws Throwable { - final AggregateRequest requestArg = validateArgAndGetPB(scan, ci, false); - final NavigableMap<byte[], List<S>> map = - new TreeMap<byte[], List<S>>(Bytes.BYTES_COMPARATOR); - class StdCallback implements Batch.Callback<List<S>> { - S sumVal = null, sumWeights = null; - - public synchronized Pair<NavigableMap<byte[], List<S>>, List<S>> getMedianParams() { - List<S> l = new ArrayList<S>(); - l.add(sumVal); - l.add(sumWeights); - Pair<NavigableMap<byte[], List<S>>, List<S>> p = - new Pair<NavigableMap<byte[], List<S>>, List<S>>(map, l); - return p; - } - - @Override - public synchronized void update(byte[] region, byte[] row, List<S> result) { - map.put(row, result); - sumVal = ci.add(sumVal, result.get(0)); - sumWeights = ci.add(sumWeights, result.get(1)); - } - } - StdCallback stdCallback = new StdCallback(); - table.coprocessorService(AggregateService.class, scan.getStartRow(), scan.getStopRow(), - new Batch.Call<AggregateService, List<S>>() { - @Override - public List<S> call(AggregateService instance) throws IOException { - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<AggregateResponse> rpcCallback = - new BlockingRpcCallback<AggregateResponse>(); - instance.getMedian(controller, requestArg, rpcCallback); - AggregateResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - - List<S> list = new ArrayList<S>(); - for (int i = 0; i < response.getFirstPartCount(); i++) { - ByteString b = response.getFirstPart(i); - T t = ProtobufUtil.getParsedGenericInstance(ci.getClass(), 4, b); - S s = ci.getPromotedValueFromProto(t); - list.add(s); - } - return list; - } - - }, stdCallback); - return stdCallback.getMedianParams(); - } - - /** - * This is the client side interface/handler for calling the median method for a - * given cf-cq combination. This method collects the necessary parameters - * to compute the median and returns the median. - * @param tableName - * @param ci - * @param scan - * @return R the median - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - R median(final TableName tableName, ColumnInterpreter<R, S, P, Q, T> ci, - Scan scan) throws Throwable { - try (Table table = connection.getTable(tableName)) { - return median(table, ci, scan); - } - } - - /** - * This is the client side interface/handler for calling the median method for a - * given cf-cq combination. This method collects the necessary parameters - * to compute the median and returns the median. - * @param table - * @param ci - * @param scan - * @return R the median - * @throws Throwable - */ - public <R, S, P extends Message, Q extends Message, T extends Message> - R median(final Table table, ColumnInterpreter<R, S, P, Q, T> ci, - Scan scan) throws Throwable { - Pair<NavigableMap<byte[], List<S>>, List<S>> p = getMedianArgs(table, ci, scan); - byte[] startRow = null; - byte[] colFamily = scan.getFamilies()[0]; - NavigableSet<byte[]> quals = scan.getFamilyMap().get(colFamily); - NavigableMap<byte[], List<S>> map = p.getFirst(); - S sumVal = p.getSecond().get(0); - S sumWeights = p.getSecond().get(1); - double halfSumVal = ci.divideForAvg(sumVal, 2L); - double movingSumVal = 0; - boolean weighted = false; - if (quals.size() > 1) { - weighted = true; - halfSumVal = ci.divideForAvg(sumWeights, 2L); - } - - for (Map.Entry<byte[], List<S>> entry : map.entrySet()) { - S s = weighted ? entry.getValue().get(1) : entry.getValue().get(0); - double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); - if (newSumVal > halfSumVal) break; // we found the region with the median - movingSumVal = newSumVal; - startRow = entry.getKey(); - } - // scan the region with median and find it - Scan scan2 = new Scan(scan); - // inherit stop row from method parameter - if (startRow != null) scan2.setStartRow(startRow); - ResultScanner scanner = null; - try { - int cacheSize = scan2.getCaching(); - if (!scan2.getCacheBlocks() || scan2.getCaching() < 2) { - scan2.setCacheBlocks(true); - cacheSize = 5; - scan2.setCaching(cacheSize); - } - scanner = table.getScanner(scan2); - Result[] results = null; - byte[] qualifier = quals.pollFirst(); - // qualifier for the weight column - byte[] weightQualifier = weighted ? quals.pollLast() : qualifier; - R value = null; - do { - results = scanner.next(cacheSize); - if (results != null && results.length > 0) { - for (int i = 0; i < results.length; i++) { - Result r = results[i]; - // retrieve weight - Cell kv = r.getColumnLatestCell(colFamily, weightQualifier); - R newValue = ci.getValue(colFamily, weightQualifier, kv); - S s = ci.castToReturnType(newValue); - double newSumVal = movingSumVal + ci.divideForAvg(s, 1L); - // see if we have moved past the median - if (newSumVal > halfSumVal) { - return value; - } - movingSumVal = newSumVal; - kv = r.getColumnLatestCell(colFamily, qualifier); - value = ci.getValue(colFamily, qualifier, kv); - } - } - } while (results != null && results.length > 0); - } finally { - if (scanner != null) { - scanner.close(); - } - } - return null; - } - - <R, S, P extends Message, Q extends Message, T extends Message> AggregateRequest - validateArgAndGetPB(Scan scan, ColumnInterpreter<R,S,P,Q,T> ci, boolean canFamilyBeAbsent) - throws IOException { - validateParameters(scan, canFamilyBeAbsent); - final AggregateRequest.Builder requestBuilder = - AggregateRequest.newBuilder(); - requestBuilder.setInterpreterClassName(ci.getClass().getCanonicalName()); - P columnInterpreterSpecificData = null; - if ((columnInterpreterSpecificData = ci.getRequestData()) - != null) { - requestBuilder.setInterpreterSpecificBytes(columnInterpreterSpecificData.toByteString()); - } - requestBuilder.setScan(ProtobufUtil.toScan(scan)); - return requestBuilder.build(); - } - - byte[] getBytesFromResponse(ByteString response) { - ByteBuffer bb = response.asReadOnlyByteBuffer(); - bb.rewind(); - byte[] bytes; - if (bb.hasArray()) { - bytes = bb.array(); - } else { - bytes = response.toByteArray(); - } - return bytes; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java index 5d1cc91..7d08b7e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/BigDecimalColumnInterpreter.java @@ -30,9 +30,10 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.BigDecimalMsg; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.EmptyMsg; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; +import com.google.protobuf.ByteString; + /** * ColumnInterpreter for doing Aggregation's with BigDecimal columns. This class * is required at the RegionServer also. @@ -123,9 +124,9 @@ public class BigDecimalColumnInterpreter extends ColumnInterpreter<BigDecimal, B private BigDecimalMsg getProtoForType(BigDecimal t) { BigDecimalMsg.Builder builder = BigDecimalMsg.newBuilder(); - return builder.setBigdecimalMsg(ByteStringer.wrap(Bytes.toBytes(t))).build(); + return builder.setBigdecimalMsg(ByteString.copyFrom(Bytes.toBytes(t))).build(); } - + @Override public BigDecimalMsg getProtoForCellType(BigDecimal t) { return getProtoForType(t); http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java index 225e685..2590021 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java @@ -18,18 +18,17 @@ */ package org.apache.hadoop.hbase.client.replication; -import com.google.protobuf.ByteString; - import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos; -import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -279,7 +278,6 @@ public final class ReplicationSerDeHelper { if (tableCFsMap != null) { peerConfig.setTableCFsMap(tableCFsMap); } - List<ByteString> namespacesList = peer.getNamespacesList(); if (namespacesList != null && namespacesList.size() != 0) { Set<String> namespaces = new HashSet<String>(); http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java index 3cbb7b9..b59398b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryComparator.java @@ -24,11 +24,12 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; /** * A binary comparator which lexicographically compares against the specified @@ -36,8 +37,7 @@ import com.google.protobuf.InvalidProtocolBufferException; */ @InterfaceAudience.Public @InterfaceStability.Stable -public class BinaryComparator extends ByteArrayComparable { - +public class BinaryComparator extends org.apache.hadoop.hbase.filter.ByteArrayComparable { /** * Constructor * @param value value @@ -62,7 +62,7 @@ public class BinaryComparator extends ByteArrayComparable { public byte [] toByteArray() { ComparatorProtos.BinaryComparator.Builder builder = ComparatorProtos.BinaryComparator.newBuilder(); - builder.setComparable(super.convert()); + builder.setComparable(ProtobufUtil.toByteArrayComparable(this.value)); return builder.build().toByteArray(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java index a26edbc..01cb769 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BinaryPrefixComparator.java @@ -24,11 +24,12 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos; import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; /** * A comparator which compares against a specified byte array, but only compares @@ -67,7 +68,7 @@ public class BinaryPrefixComparator extends ByteArrayComparable { public byte [] toByteArray() { ComparatorProtos.BinaryPrefixComparator.Builder builder = ComparatorProtos.BinaryPrefixComparator.newBuilder(); - builder.setComparable(super.convert()); + builder.setComparable(ProtobufUtil.toByteArrayComparable(this.value)); return builder.build().toByteArray(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java index db51df7..dac8864 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/BitComparator.java @@ -24,9 +24,10 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.generated.ComparatorProtos; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; /** * A bit comparator which performs the specified bitwise operation on each of the bytes @@ -72,7 +73,7 @@ public class BitComparator extends ByteArrayComparable { public byte [] toByteArray() { ComparatorProtos.BitComparator.Builder builder = ComparatorProtos.BitComparator.newBuilder(); - builder.setComparable(super.convert()); + builder.setComparable(ProtobufUtil.toByteArrayComparable(this.value)); ComparatorProtos.BitComparator.BitwiseOp bitwiseOpPb = ComparatorProtos.BitComparator.BitwiseOp.valueOf(bitOperator.name()); builder.setBitwiseOp(bitwiseOpPb); http://git-wip-us.apache.org/repos/asf/hbase/blob/17d4b70d/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java index c747b00..3ae20a1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/filter/ColumnCountGetFilter.java @@ -26,10 +26,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; -import org.apache.hadoop.hbase.protobuf.generated.FilterProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos; import com.google.common.base.Preconditions; -import com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; /** * Simple filter that returns first N columns on row only.