Consolidate SecureBulkLoadEndpoint into HBase core as default for bulk load


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/70f330dc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/70f330dc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/70f330dc

Branch: refs/heads/master
Commit: 70f330dc844af88b5bd3de69e271181e40bc58be
Parents: 5051ab4
Author: Jerry He <jerry...@apache.org>
Authored: Mon Jul 18 20:58:28 2016 -0700
Committer: Jerry He <jerry...@apache.org>
Committed: Tue Jul 19 19:53:10 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/MetricsConnection.java  |   12 +-
 .../hbase/client/SecureBulkLoadClient.java      |  141 +
 .../coprocessor/SecureBulkLoadClient.java       |  166 -
 .../hadoop/hbase/protobuf/ProtobufUtil.java     |   30 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |   49 +-
 .../hbase/client/TestClientNoCluster.java       |   16 +
 .../hadoop/hbase/HBaseCommonTestingUtility.java |    1 +
 .../hbase/protobuf/generated/ClientProtos.java  | 4187 +++++++++++++++++-
 .../generated/SecureBulkLoadProtos.java         | 3015 +------------
 hbase-protocol/src/main/protobuf/Client.proto   |   32 +
 .../src/main/protobuf/SecureBulkLoad.proto      |   23 -
 .../hbase/coprocessor/BulkLoadObserver.java     |    4 +-
 .../hbase/coprocessor/CoprocessorHost.java      |   20 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |   31 +-
 .../hbase/regionserver/HRegionServer.java       |   10 +
 .../hbase/regionserver/RSRpcServices.java       |   74 +-
 .../regionserver/RegionServerServices.java      |    5 +
 .../regionserver/SecureBulkLoadManager.java     |  419 ++
 .../hbase/security/access/AccessController.java |    8 +-
 .../security/access/SecureBulkLoadEndpoint.java |  436 +-
 .../hadoop/hbase/HBaseTestingUtility.java       |    9 +-
 .../hadoop/hbase/MockRegionServerServices.java  |    7 +-
 .../hbase/client/TestReplicaWithCluster.java    |   13 +-
 ...oadIncrementalHFilesUseSecurityEndPoint.java |   45 -
 .../hadoop/hbase/master/MockRegionServer.java   |   25 +-
 .../SecureBulkLoadEndpointClient.java           |  172 +
 .../regionserver/TestHRegionServerBulkLoad.java |   26 +-
 .../TestHRegionServerBulkLoadWithOldClient.java |  162 +
 ...gionServerBulkLoadWithOldSecureEndpoint.java |  177 +
 .../hbase/regionserver/TestPriorityRpc.java     |    4 +-
 .../hbase/security/access/SecureTestUtil.java   |    3 +-
 31 files changed, 5468 insertions(+), 3854 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index a5dc7fb..4fa20e6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -444,14 +444,22 @@ public class MetricsConnection implements 
StatisticTrackable {
         // use generic implementation
         break;
       case 4:
-        assert "ExecService".equals(method.getName());
+        assert "PrepareBulkLoad".equals(method.getName());
         // use generic implementation
         break;
       case 5:
-        assert "ExecRegionServerService".equals(method.getName());
+        assert "CleanupBulkLoad".equals(method.getName());
         // use generic implementation
         break;
       case 6:
+        assert "ExecService".equals(method.getName());
+        // use generic implementation
+        break;
+      case 7:
+        assert "ExecRegionServerService".equals(method.getName());
+        // use generic implementation
+        break;
+      case 8:
         assert "Multi".equals(method.getName());
         multiTracker.updateRpc(stats);
         return;

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
new file mode 100644
index 0000000..7b1547d
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import 
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.protobuf.ServiceException;
+
+/**
+ * Client proxy for SecureBulkLoadProtocol
+ */
+@InterfaceAudience.Private
+public class SecureBulkLoadClient {
+  private Table table;
+
+  public SecureBulkLoadClient(Table table) {
+    this.table = table;
+  }
+
+  public String prepareBulkLoad(final Connection conn) throws IOException {
+    try {
+      RegionServerCallable<String> callable =
+          new RegionServerCallable<String>(conn, table.getName(), 
HConstants.EMPTY_START_ROW) {
+            @Override
+            public String call(int callTimeout) throws IOException {
+              byte[] regionName = 
getLocation().getRegionInfo().getRegionName();
+              RegionSpecifier region =
+                  RequestConverter
+                      .buildRegionSpecifier(RegionSpecifierType.REGION_NAME, 
regionName);
+              try {
+                PrepareBulkLoadRequest request =
+                    PrepareBulkLoadRequest.newBuilder()
+                        
.setTableName(ProtobufUtil.toProtoTableName(table.getName()))
+                        .setRegion(region).build();
+                PrepareBulkLoadResponse response = 
getStub().prepareBulkLoad(null, request);
+                return response.getBulkToken();
+              } catch (ServiceException se) {
+                throw ProtobufUtil.getRemoteException(se);
+              }
+            }
+          };
+      return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), 
null)
+          .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  public void cleanupBulkLoad(final Connection conn, final String bulkToken) 
throws IOException {
+    try {
+      RegionServerCallable<Void> callable =
+          new RegionServerCallable<Void>(conn, table.getName(), 
HConstants.EMPTY_START_ROW) {
+            @Override
+            public Void call(int callTimeout) throws IOException {
+              byte[] regionName = 
getLocation().getRegionInfo().getRegionName();
+              RegionSpecifier region = RequestConverter.buildRegionSpecifier(
+                RegionSpecifierType.REGION_NAME, regionName);
+              try {
+                CleanupBulkLoadRequest request =
+                    CleanupBulkLoadRequest.newBuilder().setRegion(region)
+                        .setBulkToken(bulkToken).build();
+                getStub().cleanupBulkLoad(null, request);
+              } catch (ServiceException se) {
+                throw ProtobufUtil.getRemoteException(se);
+              }
+              return null;
+            }
+          };
+      RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
+          .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
+    } catch (Throwable throwable) {
+      throw new IOException(throwable);
+    }
+  }
+
+  /**
+   * Securely bulk load a list of HFiles using client protocol.
+   *
+   * @param client
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @param userToken
+   * @param bulkToken
+   * @return true if all are loaded
+   * @throws IOException
+   */
+  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface 
client,
+      final List<Pair<byte[], String>> familyPaths,
+      final byte[] regionName, boolean assignSeqNum,
+      final Token<?> userToken, final String bulkToken) throws IOException {
+    BulkLoadHFileRequest request =
+        RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, 
assignSeqNum,
+          userToken, bulkToken);
+
+    try {
+      BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
+      return response.getLoaded();
+    } catch (ServiceException se) {
+      throw ProtobufUtil.getRemoteException(se);
+    }
+  }
+
+  public Path getStagingPath(String bulkToken, byte[] family) throws 
IOException {
+    return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), 
bulkToken, family);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
deleted file mode 100644
index c27322a..0000000
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client.coprocessor;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
-import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos;
-import org.apache.hadoop.hbase.security.SecureBulkLoadUtil;
-import org.apache.hadoop.hbase.util.ByteStringer;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.security.token.Token;
-
-/**
- * Client proxy for SecureBulkLoadProtocol
- * used in conjunction with SecureBulkLoadEndpoint
- */
-@InterfaceAudience.Private
-public class SecureBulkLoadClient {
-  private Table table;
-
-  public SecureBulkLoadClient(Table table) {
-    this.table = table;
-  }
-
-  public String prepareBulkLoad(final TableName tableName) throws IOException {
-    try {
-      CoprocessorRpcChannel channel = 
table.coprocessorService(HConstants.EMPTY_START_ROW);
-      SecureBulkLoadProtos.SecureBulkLoadService instance =
-          
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, 
channel);
-
-      ServerRpcController controller = new ServerRpcController();
-
-      BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse> 
rpcCallback =
-          new 
BlockingRpcCallback<SecureBulkLoadProtos.PrepareBulkLoadResponse>();
-
-      SecureBulkLoadProtos.PrepareBulkLoadRequest request =
-          SecureBulkLoadProtos.PrepareBulkLoadRequest.newBuilder()
-          .setTableName(ProtobufUtil.toProtoTableName(tableName)).build();
-
-      instance.prepareBulkLoad(controller,
-          request,
-          rpcCallback);
-
-      SecureBulkLoadProtos.PrepareBulkLoadResponse response = 
rpcCallback.get();
-      if (controller.failedOnException()) {
-        throw controller.getFailedOn();
-      }
-      
-      return response.getBulkToken();
-    } catch (Throwable throwable) {
-      throw new IOException(throwable);
-    }
-  }
-
-  public void cleanupBulkLoad(final String bulkToken) throws IOException {
-    try {
-      CoprocessorRpcChannel channel = 
table.coprocessorService(HConstants.EMPTY_START_ROW);
-      SecureBulkLoadProtos.SecureBulkLoadService instance =
-          
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, 
channel);
-
-      ServerRpcController controller = new ServerRpcController();
-
-      BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse> 
rpcCallback =
-          new 
BlockingRpcCallback<SecureBulkLoadProtos.CleanupBulkLoadResponse>();
-
-      SecureBulkLoadProtos.CleanupBulkLoadRequest request =
-          SecureBulkLoadProtos.CleanupBulkLoadRequest.newBuilder()
-              .setBulkToken(bulkToken).build();
-
-      instance.cleanupBulkLoad(controller,
-          request,
-          rpcCallback);
-
-      if (controller.failedOnException()) {
-        throw controller.getFailedOn();
-      }
-    } catch (Throwable throwable) {
-      throw new IOException(throwable);
-    }
-  }
-
-  public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
-                         final Token<?> userToken,
-                         final String bulkToken,
-                         final byte[] startRow) throws IOException {
-    // we never want to send a batch of HFiles to all regions, thus cannot call
-    // HTable#coprocessorService methods that take start and end rowkeys; see 
HBASE-9639
-    try {
-      CoprocessorRpcChannel channel = table.coprocessorService(startRow);
-      SecureBulkLoadProtos.SecureBulkLoadService instance =
-          
ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, 
channel);
-
-      SecureBulkLoadProtos.DelegationToken protoDT =
-          SecureBulkLoadProtos.DelegationToken.newBuilder().build();
-      if(userToken != null) {
-        protoDT =
-            SecureBulkLoadProtos.DelegationToken.newBuilder()
-              .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
-              .setPassword(ByteStringer.wrap(userToken.getPassword()))
-              .setKind(userToken.getKind().toString())
-              .setService(userToken.getService().toString()).build();
-      }
-
-      List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
-          new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>();
-      for(Pair<byte[], String> el: familyPaths) {
-        
protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
-          .setFamily(ByteStringer.wrap(el.getFirst()))
-          .setPath(el.getSecond()).build());
-      }
-
-      SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request =
-          SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
-            .setFsToken(protoDT)
-            .addAllFamilyPath(protoFamilyPaths)
-            .setBulkToken(bulkToken).build();
-
-      ServerRpcController controller = new ServerRpcController();
-      BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> 
rpcCallback =
-          new 
BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>();
-      instance.secureBulkLoadHFiles(controller,
-        request,
-        rpcCallback);
-
-      SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = 
rpcCallback.get();
-      if (controller.failedOnException()) {
-        throw controller.getFailedOn();
-      }
-      return response.getLoaded();
-    } catch (Throwable throwable) {
-      throw new IOException(throwable);
-    }
-  }
-
-  public Path getStagingPath(String bulkToken, byte[] family) throws 
IOException {
-    return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), 
bulkToken, family);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index c477063..08c18c6 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -108,8 +108,6 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionReques
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -1578,30 +1576,6 @@ public final class ProtobufUtil {
 
 // Start helpers for Client
 
-  /**
-   * A helper to bulk load a list of HFiles using client protocol.
-   *
-   * @param client
-   * @param familyPaths
-   * @param regionName
-   * @param assignSeqNum
-   * @return true if all are loaded
-   * @throws IOException
-   */
-  public static boolean bulkLoadHFile(final ClientService.BlockingInterface 
client,
-      final List<Pair<byte[], String>> familyPaths,
-      final byte[] regionName, boolean assignSeqNum) throws IOException {
-    BulkLoadHFileRequest request =
-      RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, 
assignSeqNum);
-    try {
-      BulkLoadHFileResponse response =
-        client.bulkLoadHFile(null, request);
-      return response.getLoaded();
-    } catch (ServiceException se) {
-      throw getRemoteException(se);
-    }
-  }
-
   public static CoprocessorServiceResponse execService(final RpcController 
controller,
       final ClientService.BlockingInterface client, final 
CoprocessorServiceCall call,
       final byte[] regionName) throws IOException {
@@ -1618,8 +1592,8 @@ public final class ProtobufUtil {
   }
 
   public static CoprocessorServiceResponse execService(final RpcController 
controller,
-    final MasterService.BlockingInterface client, final CoprocessorServiceCall 
call)
-  throws IOException {
+      final MasterService.BlockingInterface client, final 
CoprocessorServiceCall call)
+      throws IOException {
     CoprocessorServiceRequest request = CoprocessorServiceRequest.newBuilder()
         .setCall(call).setRegion(
            RequestConverter.buildRegionSpecifier(REGION_NAME, 
HConstants.EMPTY_BYTE_ARRAY)).build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index c5fe988..5fe2016 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.protobuf;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
@@ -64,7 +65,6 @@ import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes
 import 
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
@@ -103,8 +103,8 @@ import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableReques
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.NormalizeRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
-import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ReleaseSplitOrMergeLockAndRollbackRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetNormalizerRunningRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetSplitOrMergeEnabledRequest;
@@ -115,6 +115,7 @@ import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.security.token.Token;
 
 import com.google.protobuf.ByteString;
 
@@ -526,19 +527,41 @@ public final class RequestConverter {
    */
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
       final List<Pair<byte[], String>> familyPaths,
-      final byte[] regionName, boolean assignSeqNum) {
-    BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
-    RegionSpecifier region = buildRegionSpecifier(
+      final byte[] regionName, boolean assignSeqNum,
+      final Token<?> userToken, final String bulkToken) {
+    RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
-    builder.setRegion(region);
-    FamilyPath.Builder familyPathBuilder = FamilyPath.newBuilder();
-    for (Pair<byte[], String> familyPath: familyPaths) {
-      familyPathBuilder.setFamily(ByteStringer.wrap(familyPath.getFirst()));
-      familyPathBuilder.setPath(familyPath.getSecond());
-      builder.addFamilyPath(familyPathBuilder.build());
+
+    ClientProtos.DelegationToken protoDT = null;
+    if (userToken != null) {
+      protoDT =
+          ClientProtos.DelegationToken.newBuilder()
+            .setIdentifier(ByteStringer.wrap(userToken.getIdentifier()))
+            .setPassword(ByteStringer.wrap(userToken.getPassword()))
+            .setKind(userToken.getKind().toString())
+            .setService(userToken.getService().toString()).build();
     }
-    builder.setAssignSeqNum(assignSeqNum);
-    return builder.build();
+
+    List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths =
+        new 
ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>(familyPaths.size());
+    for(Pair<byte[], String> el: familyPaths) {
+      
protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder()
+        .setFamily(ByteStringer.wrap(el.getFirst()))
+        .setPath(el.getSecond()).build());
+    }
+
+    BulkLoadHFileRequest.Builder request =
+        ClientProtos.BulkLoadHFileRequest.newBuilder()
+          .setRegion(region)
+          .setAssignSeqNum(assignSeqNum)
+          .addAllFamilyPath(protoFamilyPaths);
+    if (userToken != null) {
+      request.setFsToken(protoDT);
+    }
+    if (bulkToken != null) {
+      request.setBulkToken(bulkToken);
+    }
+    return request.build();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
index e8135a8..1ece448 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
@@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService.BlockingInterface;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
@@ -63,6 +65,8 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
@@ -478,6 +482,18 @@ public class TestClientNoCluster extends Configured 
implements Tool {
         CoprocessorServiceRequest request) throws ServiceException {
       throw new NotImplementedException();
     }
+
+    @Override
+    public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
+        PrepareBulkLoadRequest request) throws ServiceException {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
+        CleanupBulkLoadRequest request) throws ServiceException {
+      throw new NotImplementedException();
+    }
   }
 
   static ScanResponse doMetaScanResponse(final SortedMap<byte [], 
Pair<HRegionInfo, ServerName>> meta,

http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
index 3cae4d2..4e7c8d2 100644
--- 
a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
+++ 
b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
@@ -118,6 +118,7 @@ public class HBaseCommonTestingUtility {
     if (deleteOnExit()) this.dataTestDir.deleteOnExit();
 
     createSubDir("hbase.local.dir", testPath, "hbase-local-dir");
+    createSubDir("hbase.bulkload.staging.dir", testPath, "staging");
 
     return testPath;
   }

Reply via email to