Consolidate SecureBulkLoadEndpoint into HBase core as default for bulk load
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70f330dc Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70f330dc Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70f330dc Branch: refs/heads/master Commit: 70f330dc844af88b5bd3de69e271181e40bc58be Parents: 5051ab4 Author: Jerry He <jerry...@apache.org> Authored: Mon Jul 18 20:58:28 2016 -0700 Committer: Jerry He <jerry...@apache.org> Committed: Tue Jul 19 19:53:10 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/MetricsConnection.java | 12 +- .../hbase/client/SecureBulkLoadClient.java | 141 + .../coprocessor/SecureBulkLoadClient.java | 166 - .../hadoop/hbase/protobuf/ProtobufUtil.java | 30 +- .../hadoop/hbase/protobuf/RequestConverter.java | 49 +- .../hbase/client/TestClientNoCluster.java | 16 + .../hadoop/hbase/HBaseCommonTestingUtility.java | 1 + .../hbase/protobuf/generated/ClientProtos.java | 4187 +++++++++++++++++- .../generated/SecureBulkLoadProtos.java | 3015 +------------ hbase-protocol/src/main/protobuf/Client.proto | 32 + .../src/main/protobuf/SecureBulkLoad.proto | 23 - .../hbase/coprocessor/BulkLoadObserver.java | 4 +- .../hbase/coprocessor/CoprocessorHost.java | 20 + .../hbase/mapreduce/LoadIncrementalHFiles.java | 31 +- .../hbase/regionserver/HRegionServer.java | 10 + .../hbase/regionserver/RSRpcServices.java | 74 +- .../regionserver/RegionServerServices.java | 5 + .../regionserver/SecureBulkLoadManager.java | 419 ++ .../hbase/security/access/AccessController.java | 8 +- .../security/access/SecureBulkLoadEndpoint.java | 436 +- .../hadoop/hbase/HBaseTestingUtility.java | 9 +- .../hadoop/hbase/MockRegionServerServices.java | 7 +- .../hbase/client/TestReplicaWithCluster.java | 13 +- ...oadIncrementalHFilesUseSecurityEndPoint.java | 45 - .../hadoop/hbase/master/MockRegionServer.java | 25 +- .../SecureBulkLoadEndpointClient.java | 172 + .../regionserver/TestHRegionServerBulkLoad.java | 26 +- .../TestHRegionServerBulkLoadWithOldClient.java | 162 + ...gionServerBulkLoadWithOldSecureEndpoint.java | 177 + .../hbase/regionserver/TestPriorityRpc.java | 4 +- .../hbase/security/access/SecureTestUtil.java | 3 +- 31 files changed, 5468 insertions(+), 3854 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java index a5dc7fb..4fa20e6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java @@ -444,14 +444,22 @@ public class MetricsConnection implements StatisticTrackable { // use generic implementation break; case 4: - assert "ExecService".equals(method.getName()); + assert "PrepareBulkLoad".equals(method.getName()); // use generic implementation break; case 5: - assert "ExecRegionServerService".equals(method.getName()); + assert "CleanupBulkLoad".equals(method.getName()); // use generic implementation break; case 6: + assert "ExecService".equals(method.getName()); + // use generic implementation + break; + case 7: + assert "ExecRegionServerService".equals(method.getName()); + // use generic implementation + break; + case 8: assert "Multi".equals(method.getName()); multiTracker.updateRpc(stats); return; http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java new file mode 100644 index 0000000..7b1547d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; +import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.token.Token; + +import com.google.protobuf.ServiceException; + +/** + * Client proxy for SecureBulkLoadProtocol + */ +@InterfaceAudience.Private +public class SecureBulkLoadClient { + private Table table; + + public SecureBulkLoadClient(Table table) { + this.table = table; + } + + public String prepareBulkLoad(final Connection conn) throws IOException { + try { + RegionServerCallable<String> callable = + new RegionServerCallable<String>(conn, table.getName(), HConstants.EMPTY_START_ROW) { + @Override + public String call(int callTimeout) throws IOException { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + RegionSpecifier region = + RequestConverter + .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName); + try { + PrepareBulkLoadRequest request = + PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(table.getName())) + .setRegion(region).build(); + PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request); + return response.getBulkToken(); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + }; + return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) + .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException { + try { + RegionServerCallable<Void> callable = + new RegionServerCallable<Void>(conn, table.getName(), HConstants.EMPTY_START_ROW) { + @Override + public Void call(int callTimeout) throws IOException { + byte[] regionName = getLocation().getRegionInfo().getRegionName(); + RegionSpecifier region = RequestConverter.buildRegionSpecifier( + RegionSpecifierType.REGION_NAME, regionName); + try { + CleanupBulkLoadRequest request = + CleanupBulkLoadRequest.newBuilder().setRegion(region) + .setBulkToken(bulkToken).build(); + getStub().cleanupBulkLoad(null, request); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + return null; + } + }; + RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null) + .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + /** + * Securely bulk load a list of HFiles using client protocol. + * + * @param client + * @param familyPaths + * @param regionName + * @param assignSeqNum + * @param userToken + * @param bulkToken + * @return true if all are loaded + * @throws IOException + */ + public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client, + final List<Pair<byte[], String>> familyPaths, + final byte[] regionName, boolean assignSeqNum, + final Token<?> userToken, final String bulkToken) throws IOException { + BulkLoadHFileRequest request = + RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, + userToken, bulkToken); + + try { + BulkLoadHFileResponse response = client.bulkLoadHFile(null, request); + return response.getLoaded(); + } catch (ServiceException se) { + throw ProtobufUtil.getRemoteException(se); + } + } + + public Path getStagingPath(String bulkToken, byte[] family) throws IOException { + return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java deleted file mode 100644 index c27322a..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client.coprocessor; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.ipc.ServerRpcController; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos; -import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; -import org.apache.hadoop.hbase.util.ByteStringer; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.security.token.Token; - -/** - * Client proxy for SecureBulkLoadProtocol - * used in conjunction with SecureBulkLoadEndpoint - */ -@InterfaceAudience.Private -public class SecureBulkLoadClient { - private Table table; - - public SecureBulkLoadClient(Table table) { - this.table = table; - } - - public String prepareBulkLoad(final TableName tableName) throws IOException { - try { - CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); - SecureBulkLoadProtos.SecureBulkLoadService instance = - ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - - ServerRpcController controller = new ServerRpcController(); - - BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> rpcCallback = - new BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>(); - - SecureBulkLoadProtos.PrepareBulkLoadRequest request = - SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder() - .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); - - instance.prepareBulkLoad(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.PrepareBulkLoadResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - - return response.getBulkToken(); - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } - - public void cleanupBulkLoad(final String bulkToken) throws IOException { - try { - CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); - SecureBulkLoadProtos.SecureBulkLoadService instance = - ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - - ServerRpcController controller = new ServerRpcController(); - - BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> rpcCallback = - new BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>(); - - SecureBulkLoadProtos.CleanupBulkLoadRequest request = - SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder() - .setBulkToken(bulkToken).build(); - - instance.cleanupBulkLoad(controller, - request, - rpcCallback); - - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } - - public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths, - final Token<?> userToken, - final String bulkToken, - final byte[] startRow) throws IOException { - // we never want to send a batch of HFiles to all regions, thus cannot call - // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 - try { - CoprocessorRpcChannel channel = table.coprocessorService(startRow); - SecureBulkLoadProtos.SecureBulkLoadService instance = - ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); - - SecureBulkLoadProtos.DelegationToken protoDT = - SecureBulkLoadProtos.DelegationToken.newBuilder().build(); - if(userToken != null) { - protoDT = - SecureBulkLoadProtos.DelegationToken.newBuilder() - .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) - .setPassword(ByteStringer.wrap(userToken.getPassword())) - .setKind(userToken.getKind().toString()) - .setService(userToken.getService().toString()).build(); - } - - List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths = - new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>(); - for(Pair<byte[], String> el: familyPaths) { - protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() - .setFamily(ByteStringer.wrap(el.getFirst())) - .setPath(el.getSecond()).build()); - } - - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = - SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() - .setFsToken(protoDT) - .addAllFamilyPath(protoFamilyPaths) - .setBulkToken(bulkToken).build(); - - ServerRpcController controller = new ServerRpcController(); - BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback = - new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>(); - instance.secureBulkLoadHFiles(controller, - request, - rpcCallback); - - SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); - if (controller.failedOnException()) { - throw controller.getFailedOn(); - } - return response.getLoaded(); - } catch (Throwable throwable) { - throw new IOException(throwable); - } - } - - public Path getStagingPath(String bulkToken, byte[] family) throws IOException { - return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java index c477063..08c18c6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java @@ -108,8 +108,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionReques import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos; import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; @@ -1578,30 +1576,6 @@ public final class ProtobufUtil { // Start helpers for Client - /** - * A helper to bulk load a list of HFiles using client protocol. - * - * @param client - * @param familyPaths - * @param regionName - * @param assignSeqNum - * @return true if all are loaded - * @throws IOException - */ - public static boolean bulkLoadHFile(final ClientService.BlockingInterface client, - final List<Pair<byte[], String>> familyPaths, - final byte[] regionName, boolean assignSeqNum) throws IOException { - BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum); - try { - BulkLoadHFileResponse response = - client.bulkLoadHFile(null, request); - return response.getLoaded(); - } catch (ServiceException se) { - throw getRemoteException(se); - } - } - public static CoprocessorServiceResponse execService(final RpcController controller, final ClientService.BlockingInterface client, final CoprocessorServiceCall call, final byte[] regionName) throws IOException { @@ -1618,8 +1592,8 @@ public final class ProtobufUtil { } public static CoprocessorServiceResponse execService(final RpcController controller, - final MasterService.BlockingInterface client, final CoprocessorServiceCall call) - throws IOException { + final MasterService.BlockingInterface client, final CoprocessorServiceCall call) + throws IOException { CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder() .setCall(call).setRegion( RequestConverter.buildRegionSpecifier(REGION_NAME, HConstants.EMPTY_BYTE_ARRAY)).build(); http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index c5fe988..5fe2016 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.protobuf; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; @@ -64,7 +65,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; @@ -103,8 +103,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableReques import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ReleaseSplitOrMergeLockAndRollbackRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest; @@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.token.Token; import com.google.protobuf.ByteString; @@ -526,19 +527,41 @@ public final class RequestConverter { */ public static BulkLoadHFileRequest buildBulkLoadHFileRequest( final List<Pair<byte[], String>> familyPaths, - final byte[] regionName, boolean assignSeqNum) { - BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder(); - RegionSpecifier region = buildRegionSpecifier( + final byte[] regionName, boolean assignSeqNum, + final Token<?> userToken, final String bulkToken) { + RegionSpecifier region = RequestConverter.buildRegionSpecifier( RegionSpecifierType.REGION_NAME, regionName); - builder.setRegion(region); - FamilyPath.Builder familyPathBuilder = FamilyPath.newBuilder(); - for (Pair<byte[], String> familyPath: familyPaths) { - familyPathBuilder.setFamily(ByteStringer.wrap(familyPath.getFirst())); - familyPathBuilder.setPath(familyPath.getSecond()); - builder.addFamilyPath(familyPathBuilder.build()); + + ClientProtos.DelegationToken protoDT = null; + if (userToken != null) { + protoDT = + ClientProtos.DelegationToken.newBuilder() + .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) + .setPassword(ByteStringer.wrap(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); } - builder.setAssignSeqNum(assignSeqNum); - return builder.build(); + + List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths = + new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>(familyPaths.size()); + for(Pair<byte[], String> el: familyPaths) { + protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() + .setFamily(ByteStringer.wrap(el.getFirst())) + .setPath(el.getSecond()).build()); + } + + BulkLoadHFileRequest.Builder request = + ClientProtos.BulkLoadHFileRequest.newBuilder() + .setRegion(region) + .setAssignSeqNum(assignSeqNum) + .addAllFamilyPath(protoFamilyPaths); + if (userToken != null) { + request.setFsToken(protoDT); + } + if (bulkToken != null) { + request.setBulkToken(bulkToken); + } + return request.build(); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java index e8135a8..1ece448 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.protobuf.generated.CellProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; @@ -63,6 +65,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; @@ -478,6 +482,18 @@ public class TestClientNoCluster extends Configured implements Tool { CoprocessorServiceRequest request) throws ServiceException { throw new NotImplementedException(); } + + @Override + public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, + PrepareBulkLoadRequest request) throws ServiceException { + throw new NotImplementedException(); + } + + @Override + public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, + CleanupBulkLoadRequest request) throws ServiceException { + throw new NotImplementedException(); + } } static ScanResponse doMetaScanResponse(final SortedMap<byte [], Pair<HRegionInfo, ServerName>> meta, http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java index 3cae4d2..4e7c8d2 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java @@ -118,6 +118,7 @@ public class HBaseCommonTestingUtility { if (deleteOnExit()) this.dataTestDir.deleteOnExit(); createSubDir("hbase.local.dir", testPath, "hbase-local-dir"); + createSubDir("hbase.bulkload.staging.dir", testPath, "staging"); return testPath; }