http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-protocol/src/main/protobuf/Client.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto index 8a4d459..adb66f7 100644 --- a/hbase-protocol/src/main/protobuf/Client.proto +++ b/hbase-protocol/src/main/protobuf/Client.proto @@ -337,6 +337,8 @@ message BulkLoadHFileRequest { required RegionSpecifier region = 1; repeated FamilyPath family_path = 2; optional bool assign_seq_num = 3; + optional DelegationToken fs_token = 4; + optional string bulk_token = 5; message FamilyPath { required bytes family = 1; @@ -348,6 +350,30 @@ message BulkLoadHFileResponse { required bool loaded = 1; } +message DelegationToken { + optional bytes identifier = 1; + optional bytes password = 2; + optional string kind = 3; + optional string service = 4; +} + +message PrepareBulkLoadRequest { + required TableName table_name = 1; + optional RegionSpecifier region = 2; +} + +message PrepareBulkLoadResponse { + required string bulk_token = 1; +} + +message CleanupBulkLoadRequest { + required string bulk_token = 1; + optional RegionSpecifier region = 2; +} + +message CleanupBulkLoadResponse { +} + message CoprocessorServiceCall { required bytes row = 1; required string service_name = 2; @@ -467,6 +493,12 @@ service ClientService { rpc BulkLoadHFile(BulkLoadHFileRequest) returns(BulkLoadHFileResponse); + rpc PrepareBulkLoad(PrepareBulkLoadRequest) + returns (PrepareBulkLoadResponse); + + rpc CleanupBulkLoad(CleanupBulkLoadRequest) + returns (CleanupBulkLoadResponse); + rpc ExecService(CoprocessorServiceRequest) returns(CoprocessorServiceResponse);
http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto index 814735b..290355e 100644 --- a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto +++ b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto @@ -37,29 +37,6 @@ message SecureBulkLoadHFilesResponse { required bool loaded = 1; } -message DelegationToken { - optional bytes identifier = 1; - optional bytes password = 2; - optional string kind = 3; - optional string service = 4; -} - -message PrepareBulkLoadRequest { - required TableName table_name = 1; -} - -message PrepareBulkLoadResponse { - required string bulk_token = 1; -} - -message CleanupBulkLoadRequest { - required string bulk_token = 1; - -} - -message CleanupBulkLoadResponse { -} - service SecureBulkLoadService { rpc PrepareBulkLoad(PrepareBulkLoadRequest) returns (PrepareBulkLoadResponse); http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java index c7f0b90..1095d6c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BulkLoadObserver.java @@ -25,8 +25,8 @@ import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; /** * Coprocessors implement this interface to observe and mediate bulk load operations. http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java index e937569..37c344b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/CoprocessorHost.java @@ -334,6 +334,26 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> { } /** + * Find list of CoprocessorEnvironment that extend/implement the given class/interface + * @param cls the class/interface to look for + * @return the list of CoprocessorEnvironment, or null if not found + */ + public List<CoprocessorEnvironment> findCoprocessorEnvironment(Class<?> cls) { + ArrayList<CoprocessorEnvironment> ret = new ArrayList<CoprocessorEnvironment>(); + + for (E env: coprocessors) { + Coprocessor cp = env.getInstance(); + + if(cp != null) { + if (cls.isAssignableFrom(cp.getClass())) { + ret.add(env); + } + } + } + return ret; + } + + /** * Find a coprocessor environment by class name * @param className the class name * @return the coprocessor, or null if not found http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java index a23d739..c04794b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java @@ -74,8 +74,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.RegionServerCallable; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.client.SecureBulkLoadClient; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.io.HalfStoreFileReader; @@ -87,7 +87,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; @@ -323,6 +322,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool { // LQI queue does not need to be threadsafe -- all operations on this queue // happen in this thread Deque<LoadQueueItem> queue = new LinkedList<>(); + SecureBulkLoadClient secureClient = new SecureBulkLoadClient(table); + try { /* * Checking hfile format is a time-consuming operation, we should have an option to skip @@ -346,13 +347,16 @@ public class LoadIncrementalHFiles extends Configured implements Tool { return; } + if(isSecureBulkLoadEndpointAvailable()) { + LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); + LOG.warn("Secure bulk load has been integrated into HBase core."); + } + //If using secure bulk load, get source delegation token, and //prepare staging directory and token // fs is the source filesystem fsDelegationToken.acquireDelegationToken(fs); - if(isSecureBulkLoadEndpointAvailable()) { - bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(table.getName()); - } + bulkToken = secureClient.prepareBulkLoad(admin.getConnection()); // Assumes that region splits can happen while this occurs. while (!queue.isEmpty()) { @@ -391,7 +395,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool { } finally { fsDelegationToken.releaseDelegationToken(); if(bulkToken != null) { - new SecureBulkLoadClient(table).cleanupBulkLoad(bulkToken); + secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken); } pool.shutdown(); if (queue != null && !queue.isEmpty()) { @@ -789,21 +793,18 @@ public class LoadIncrementalHFiles extends Configured implements Tool { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths); byte[] regionName = getLocation().getRegionInfo().getRegionName(); - if (!isSecureBulkLoadEndpointAvailable()) { - success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds); - } else { - try (Table table = conn.getTable(getTableName())) { - secureClient = new SecureBulkLoadClient(table); - success = secureClient.bulkLoadHFiles(famPaths, fsDelegationToken.getUserToken(), - bulkToken, getLocation().getRegionInfo().getStartKey()); - } + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(table); + success = + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + assignSeqIds, fsDelegationToken.getUserToken(), bulkToken); } return success; } finally { //Best effort copying of files that might not have been imported //from the staging directory back to original location //in user directory - if(secureClient != null && !success) { + if (secureClient != null && !success) { FileSystem targetFs = FileSystem.get(getConf()); // fs is the source filesystem if(fs == null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9567602..e03993f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -498,6 +498,8 @@ public class HRegionServer extends HasThread implements private volatile ThroughputController flushThroughputController; + protected final SecureBulkLoadManager secureBulkLoadManager; + /** * Starts a HRegionServer at the default location. * @param conf @@ -618,6 +620,9 @@ public class HRegionServer extends HasThread implements } this.configurationManager = new ConfigurationManager(); + this.secureBulkLoadManager = new SecureBulkLoadManager(this.conf); + this.secureBulkLoadManager.start(); + rpcServices.start(); putUpWebUI(); this.walRoller = new LogRoller(this, this); @@ -3431,4 +3436,9 @@ public class HRegionServer extends HasThread implements public MetricsRegionServer getMetrics() { return metricsRegionServer; } + + @Override + public SecureBulkLoadManager getSecureBulkLoadManager() { + return this.secureBulkLoadManager; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index d9ea186..9cfc5df 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -134,6 +134,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Action; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; @@ -147,6 +149,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException; @@ -2042,21 +2046,29 @@ public class RSRpcServices implements HBaseRPCErrorHandler, checkOpen(); requestCount.increment(); Region region = getRegion(request.getRegion()); - List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(); - for (FamilyPath familyPath: request.getFamilyPathList()) { - familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(), - familyPath.getPath())); - } boolean bypass = false; - if (region.getCoprocessorHost() != null) { - bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); - } boolean loaded = false; - if (!bypass) { - loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); - } - if (region.getCoprocessorHost() != null) { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + + if (!request.hasBulkToken()) { + // Old style bulk load. This will not be supported in future releases + List<Pair<byte[], String>> familyPaths = + new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount()); + for (FamilyPath familyPath : request.getFamilyPathList()) { + familyPaths.add(new Pair<byte[], String>(familyPath.getFamily().toByteArray(), familyPath + .getPath())); + } + if (region.getCoprocessorHost() != null) { + bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); + } + if (!bypass) { + loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null); + } + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + } + } else { + // secure bulk load + loaded = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request); } BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder(); builder.setLoaded(loaded); @@ -2067,6 +2079,41 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } @Override + public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, + PrepareBulkLoadRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.increment(); + + Region region = getRegion(request.getRegion()); + + String bulkToken = regionServer.secureBulkLoadManager.prepareBulkLoad(region, request); + PrepareBulkLoadResponse.Builder builder = PrepareBulkLoadResponse.newBuilder(); + builder.setBulkToken(bulkToken); + return builder.build(); + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + @Override + public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, + CleanupBulkLoadRequest request) throws ServiceException { + try { + checkOpen(); + requestCount.increment(); + + Region region = getRegion(request.getRegion()); + + regionServer.secureBulkLoadManager.cleanupBulkLoad(region, request); + CleanupBulkLoadResponse response = CleanupBulkLoadResponse.newBuilder().build(); + return response; + } catch (IOException ie) { + throw new ServiceException(ie); + } + } + + @Override public CoprocessorServiceResponse execService(final RpcController controller, final CoprocessorServiceRequest request) throws ServiceException { try { @@ -2930,4 +2977,5 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } return UpdateConfigurationResponse.getDefaultInstance(); } + } http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index c6689a9..356a88b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -81,6 +81,11 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi RegionServerQuotaManager getRegionServerQuotaManager(); /** + * @return RegionServer's instance of {@link SecureBulkLoadManager} + */ + SecureBulkLoadManager getSecureBulkLoadManager(); + + /** * Context for postOpenDeployTasks(). */ class PostOpenDeployContext { http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java new file mode 100644 index 0000000..b47b31d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java @@ -0,0 +1,419 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener; +import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.token.FsDelegationToken; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSHDFSUtils; +import org.apache.hadoop.hbase.util.Methods; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.math.BigInteger; +import java.security.PrivilegedAction; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Bulk loads in secure mode. + * + * This service addresses two issues: + * <ol> + * <li>Moving files in a secure filesystem wherein the HBase Client + * and HBase Server are different filesystem users.</li> + * <li>Does moving in a secure manner. Assuming that the filesystem + * is POSIX compliant.</li> + * </ol> + * + * The algorithm is as follows: + * <ol> + * <li>Create an hbase owned staging directory which is + * world traversable (711): {@code /hbase/staging}</li> + * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li> + * <li>A call is made to hbase to create a secret staging directory + * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li> + * <li>The user moves the data into the random staging directory, + * then calls bulkLoadHFiles()</li> + * </ol> + * + * Like delegation tokens the strength of the security lies in the length + * and randomness of the secret directory. + * + */ +@InterfaceAudience.Private +public class SecureBulkLoadManager { + + public static final long VERSION = 0L; + + //320/5 = 64 characters + private static final int RANDOM_WIDTH = 320; + private static final int RANDOM_RADIX = 32; + + private static final Log LOG = LogFactory.getLog(SecureBulkLoadManager.class); + + private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); + private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x"); + + private SecureRandom random; + private FileSystem fs; + private Configuration conf; + + //two levels so it doesn't get deleted accidentally + //no sticky bit in Hadoop 1.0 + private Path baseStagingDir; + + private UserProvider userProvider; + + SecureBulkLoadManager(Configuration conf) { + this.conf = conf; + } + + public void start() { + random = new SecureRandom(); + baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf); + this.userProvider = UserProvider.instantiate(conf); + + try { + fs = FileSystem.get(conf); + fs.mkdirs(baseStagingDir, PERM_HIDDEN); + fs.setPermission(baseStagingDir, PERM_HIDDEN); + FileStatus status = fs.getFileStatus(baseStagingDir); + //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased + fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN); + if (status == null) { + throw new IllegalStateException("Failed to create staging directory " + + baseStagingDir.toString()); + } + if (!status.getPermission().equals(PERM_HIDDEN)) { + throw new IllegalStateException( + "Staging directory already exists but permissions aren't set to '-rwx--x--x' " + + baseStagingDir.toString()); + } + } catch (IOException e) { + LOG.error("Failed to create or set permission on staging directory " + + baseStagingDir.toString(), e); + throw new IllegalStateException("Failed to create or set permission on staging directory " + + baseStagingDir.toString(), e); + } + } + + public void stop() throws IOException { + } + + public String prepareBulkLoad(final Region region, final PrepareBulkLoadRequest request) + throws IOException { + List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region); + + if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) { + ObserverContext<RegionCoprocessorEnvironment> ctx = + new ObserverContext<RegionCoprocessorEnvironment>(); + ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost() + .findCoprocessorEnvironment(BulkLoadObserver.class).get(0)); + + for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { + bulkLoadObserver.prePrepareBulkLoad(ctx, request); + } + } + + String bulkToken = + createStagingDir(baseStagingDir, getActiveUser(), region.getTableDesc().getTableName()) + .toString(); + + return bulkToken; + } + + public void cleanupBulkLoad(final Region region, final CleanupBulkLoadRequest request) + throws IOException { + List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(region); + + if (bulkLoadObservers != null && bulkLoadObservers.size() != 0) { + ObserverContext<RegionCoprocessorEnvironment> ctx = + new ObserverContext<RegionCoprocessorEnvironment>(); + ctx.prepare((RegionCoprocessorEnvironment) region.getCoprocessorHost() + .findCoprocessorEnvironment(BulkLoadObserver.class).get(0)); + + for (BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { + bulkLoadObserver.preCleanupBulkLoad(ctx, request); + } + } + + fs.delete(new Path(request.getBulkToken()), true); + } + + public boolean secureBulkLoadHFiles(final Region region, + final BulkLoadHFileRequest request) throws IOException { + final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(request.getFamilyPathCount()); + for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { + familyPaths.add(new Pair<byte[], String>(el.getFamily().toByteArray(), el.getPath())); + } + + Token userToken = null; + if (userProvider.isHadoopSecurityEnabled()) { + userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() + .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text( + request.getFsToken().getService())); + } + final String bulkToken = request.getBulkToken(); + User user = getActiveUser(); + final UserGroupInformation ugi = user.getUGI(); + if(userToken != null) { + ugi.addToken(userToken); + } else if (userProvider.isHadoopSecurityEnabled()) { + //we allow this to pass through in "simple" security mode + //for mini cluster testing + throw new DoNotRetryIOException("User token cannot be null"); + } + + boolean bypass = false; + if (region.getCoprocessorHost() != null) { + bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); + } + boolean loaded = false; + if (!bypass) { + // Get the target fs (HBase region server fs) delegation token + // Since we have checked the permission via 'preBulkLoadHFile', now let's give + // the 'request user' necessary token to operate on the target fs. + // After this point the 'doAs' user will hold two tokens, one for the source fs + // ('request user'), another for the target fs (HBase region server principal). + if (userProvider.isHadoopSecurityEnabled()) { + FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); + targetfsDelegationToken.acquireDelegationToken(fs); + + Token<?> targetFsToken = targetfsDelegationToken.getUserToken(); + if (targetFsToken != null + && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) { + ugi.addToken(targetFsToken); + } + } + + loaded = ugi.doAs(new PrivilegedAction<Boolean>() { + @Override + public Boolean run() { + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + for(Pair<byte[], String> el: familyPaths) { + Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); + if(!fs.exists(stageFamily)) { + fs.mkdirs(stageFamily); + fs.setPermission(stageFamily, PERM_ALL_ACCESS); + } + } + //We call bulkLoadHFiles as requesting user + //To enable access prior to staging + return region.bulkLoadHFiles(familyPaths, true, + new SecureBulkLoadListener(fs, bulkToken, conf)); + } catch (Exception e) { + LOG.error("Failed to complete bulk load", e); + } finally { + if (fs != null) { + try { + if (!UserGroupInformation.getLoginUser().equals(ugi)) { + FileSystem.closeAllForUGI(ugi); + } + } catch (IOException e) { + LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e); + } + } + } + return false; + } + }); + } + if (region.getCoprocessorHost() != null) { + loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); + } + return loaded; + } + + private List<BulkLoadObserver> getBulkLoadObservers(Region region) { + List<BulkLoadObserver> coprocessorList = + region.getCoprocessorHost().findCoprocessors(BulkLoadObserver.class); + + return coprocessorList; + } + + private Path createStagingDir(Path baseDir, + User user, + TableName tableName) throws IOException { + String tblName = tableName.getNameAsString().replace(":", "_"); + String randomDir = user.getShortName()+"__"+ tblName +"__"+ + (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX)); + return createStagingDir(baseDir, user, randomDir); + } + + private Path createStagingDir(Path baseDir, + User user, + String randomDir) throws IOException { + Path p = new Path(baseDir, randomDir); + fs.mkdirs(p, PERM_ALL_ACCESS); + fs.setPermission(p, PERM_ALL_ACCESS); + return p; + } + + private User getActiveUser() throws IOException { + User user = RpcServer.getRequestUser(); + if (user == null) { + // for non-rpc handling, fallback to system user + user = userProvider.getCurrent(); + } + + //this is for testing + if (userProvider.isHadoopSecurityEnabled() + && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) { + return User.createUserForTesting(conf, user.getShortName(), new String[]{}); + } + + return user; + } + + private static class SecureBulkLoadListener implements BulkLoadListener { + // Target filesystem + private final FileSystem fs; + private final String stagingDir; + private final Configuration conf; + // Source filesystem + private FileSystem srcFs = null; + private Map<String, FsPermission> origPermissions = null; + + public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) { + this.fs = fs; + this.stagingDir = stagingDir; + this.conf = conf; + this.origPermissions = new HashMap<String, FsPermission>(); + } + + @Override + public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { + Path p = new Path(srcPath); + Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); + + // In case of Replication for bulk load files, hfiles are already copied in staging directory + if (p.equals(stageP)) { + LOG.debug(p.getName() + + " is already available in staging directory. Skipping copy or rename."); + return stageP.toString(); + } + + if (srcFs == null) { + srcFs = FileSystem.get(p.toUri(), conf); + } + + if(!isFile(p)) { + throw new IOException("Path does not reference a file: " + p); + } + + // Check to see if the source and target filesystems are the same + if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { + LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + + "the destination filesystem. Copying file over to destination staging dir."); + FileUtil.copy(srcFs, p, fs, stageP, false, conf); + } else { + LOG.debug("Moving " + p + " to " + stageP); + FileStatus origFileStatus = fs.getFileStatus(p); + origPermissions.put(srcPath, origFileStatus.getPermission()); + if(!fs.rename(p, stageP)) { + throw new IOException("Failed to move HFile: " + p + " to " + stageP); + } + } + fs.setPermission(stageP, PERM_ALL_ACCESS); + return stageP.toString(); + } + + @Override + public void doneBulkLoad(byte[] family, String srcPath) throws IOException { + LOG.debug("Bulk Load done for: " + srcPath); + } + + @Override + public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { + if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { + // files are copied so no need to move them back + return; + } + Path p = new Path(srcPath); + Path stageP = new Path(stagingDir, + new Path(Bytes.toString(family), p.getName())); + + // In case of Replication for bulk load files, hfiles are not renamed by end point during + // prepare stage, so no need of rename here again + if (p.equals(stageP)) { + LOG.debug(p.getName() + " is already available in source directory. Skipping rename."); + return; + } + + LOG.debug("Moving " + stageP + " back to " + p); + if(!fs.rename(stageP, p)) + throw new IOException("Failed to move HFile: " + stageP + " to " + p); + + // restore original permission + if (origPermissions.containsKey(srcPath)) { + fs.setPermission(p, origPermissions.get(srcPath)); + } else { + LOG.warn("Can't find previous permission for path=" + srcPath); + } + } + + /** + * Check if the path is referencing a file. + * This is mainly needed to avoid symlinks. + * @param p + * @return true if the p is a file + * @throws IOException + */ + private boolean isFile(Path p) throws IOException { + FileStatus status = srcFs.getFileStatus(p); + boolean isFile = !status.isDirectory(); + try { + isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null); + } catch (Exception e) { + } + return isFile; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index f21d8e2..7d5fc32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -95,10 +95,10 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; @@ -2145,7 +2145,7 @@ public class AccessController extends BaseMasterAndRegionObserver */ @Override public void prePrepareBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx, - PrepareBulkLoadRequest request) throws IOException { + PrepareBulkLoadRequest request) throws IOException { requireAccess("prePareBulkLoad", ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE); } @@ -2159,7 +2159,7 @@ public class AccessController extends BaseMasterAndRegionObserver */ @Override public void preCleanupBulkLoad(ObserverContext<RegionCoprocessorEnvironment> ctx, - CleanupBulkLoadRequest request) throws IOException { + CleanupBulkLoadRequest request) throws IOException { requireAccess("preCleanupBulkLoad", ctx.getEnvironment().getRegion().getTableDesc().getTableName(), Action.CREATE); } http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java index c1f9251..cb143b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java @@ -18,140 +18,52 @@ package org.apache.hadoop.hbase.security.access; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; -import com.google.protobuf.Service; +import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.CoprocessorEnvironment; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.coprocessor.BulkLoadObserver; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.coprocessor.CoprocessorService; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.PrepareBulkLoadResponse; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadRequest; -import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.CleanupBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier; +import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesRequest; import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadHFilesResponse; -import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.Region.BulkLoadListener; -import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.security.UserProvider; -import org.apache.hadoop.hbase.security.token.FsDelegationToken; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSHDFSUtils; -import org.apache.hadoop.hbase.util.Methods; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; - -import java.io.IOException; -import java.math.BigInteger; -import java.security.PrivilegedAction; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.SecureBulkLoadService; +import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; /** * Coprocessor service for bulk loads in secure mode. - * This coprocessor has to be installed as part of enabling - * security in HBase. - * - * This service addresses two issues: - * <ol> - * <li>Moving files in a secure filesystem wherein the HBase Client - * and HBase Server are different filesystem users.</li> - * <li>Does moving in a secure manner. Assuming that the filesystem - * is POSIX compliant.</li> - * </ol> - * - * The algorithm is as follows: - * <ol> - * <li>Create an hbase owned staging directory which is - * world traversable (711): {@code /hbase/staging}</li> - * <li>A user writes out data to his secure output directory: {@code /user/foo/data}</li> - * <li>A call is made to hbase to create a secret staging directory - * which globally rwx (777): {@code /user/staging/averylongandrandomdirectoryname}</li> - * <li>The user moves the data into the random staging directory, - * then calls bulkLoadHFiles()</li> - * </ol> - * Like delegation tokens the strength of the security lies in the length - * and randomness of the secret directory. - * + * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0 */ @InterfaceAudience.Private +@Deprecated public class SecureBulkLoadEndpoint extends SecureBulkLoadService implements CoprocessorService, Coprocessor { public static final long VERSION = 0L; - //320/5 = 64 characters - private static final int RANDOM_WIDTH = 320; - private static final int RANDOM_RADIX = 32; - private static final Log LOG = LogFactory.getLog(SecureBulkLoadEndpoint.class); - private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx"); - private final static FsPermission PERM_HIDDEN = FsPermission.valueOf("-rwx--x--x"); - - private SecureRandom random; - private FileSystem fs; - private Configuration conf; - - //two levels so it doesn't get deleted accidentally - //no sticky bit in Hadoop 1.0 - private Path baseStagingDir; - private RegionCoprocessorEnvironment env; - private UserProvider userProvider; - @Override public void start(CoprocessorEnvironment env) { this.env = (RegionCoprocessorEnvironment)env; - random = new SecureRandom(); - conf = env.getConfiguration(); - baseStagingDir = SecureBulkLoadUtil.getBaseStagingDir(conf); - this.userProvider = UserProvider.instantiate(conf); - - try { - fs = FileSystem.get(conf); - fs.mkdirs(baseStagingDir, PERM_HIDDEN); - fs.setPermission(baseStagingDir, PERM_HIDDEN); - //no sticky bit in hadoop-1.0, making directory nonempty so it never gets erased - fs.mkdirs(new Path(baseStagingDir,"DONOTERASE"), PERM_HIDDEN); - FileStatus status = fs.getFileStatus(baseStagingDir); - if(status == null) { - throw new IllegalStateException("Failed to create staging directory"); - } - if(!status.getPermission().equals(PERM_HIDDEN)) { - throw new IllegalStateException( - "Directory already exists but permissions aren't set to '-rwx--x--x' "); - } - } catch (IOException e) { - throw new IllegalStateException("Failed to get FileSystem instance",e); - } + LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases."); + LOG.warn("Secure bulk load has been integrated into HBase core."); } @Override @@ -159,24 +71,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } @Override - public void prepareBulkLoad(RpcController controller, - PrepareBulkLoadRequest request, - RpcCallback<PrepareBulkLoadResponse> done){ + public void prepareBulkLoad(RpcController controller, PrepareBulkLoadRequest request, + RpcCallback<PrepareBulkLoadResponse> done) { try { - List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(); - - if(bulkLoadObservers != null) { - ObserverContext<RegionCoprocessorEnvironment> ctx = - new ObserverContext<RegionCoprocessorEnvironment>(); - ctx.prepare(env); - - for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { - bulkLoadObserver.prePrepareBulkLoad(ctx, request); - } - } - - String bulkToken = createStagingDir(baseStagingDir, - getActiveUser(), ProtobufUtil.toTableName(request.getTableName())).toString(); + SecureBulkLoadManager secureBulkLoadManager = + this.env.getRegionServerServices().getSecureBulkLoadManager(); + String bulkToken = secureBulkLoadManager.prepareBulkLoad(this.env.getRegion(), request); done.run(PrepareBulkLoadResponse.newBuilder().setBulkToken(bulkToken).build()); } catch (IOException e) { ResponseConverter.setControllerException(controller, e); @@ -185,23 +85,12 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } @Override - public void cleanupBulkLoad(RpcController controller, - CleanupBulkLoadRequest request, - RpcCallback<CleanupBulkLoadResponse> done) { + public void cleanupBulkLoad(RpcController controller, CleanupBulkLoadRequest request, + RpcCallback<CleanupBulkLoadResponse> done) { try { - List<BulkLoadObserver> bulkLoadObservers = getBulkLoadObservers(); - - if(bulkLoadObservers != null) { - ObserverContext<RegionCoprocessorEnvironment> ctx = - new ObserverContext<RegionCoprocessorEnvironment>(); - ctx.prepare(env); - - for(BulkLoadObserver bulkLoadObserver : bulkLoadObservers) { - bulkLoadObserver.preCleanupBulkLoad(ctx, request); - } - } - - fs.delete(new Path(request.getBulkToken()), true); + SecureBulkLoadManager secureBulkLoadManager = + this.env.getRegionServerServices().getSecureBulkLoadManager(); + secureBulkLoadManager.cleanupBulkLoad(this.env.getRegion(), request); done.run(CleanupBulkLoadResponse.newBuilder().build()); } catch (IOException e) { ResponseConverter.setControllerException(controller, e); @@ -210,262 +99,35 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService } @Override - public void secureBulkLoadHFiles(RpcController controller, - SecureBulkLoadHFilesRequest request, - RpcCallback<SecureBulkLoadHFilesResponse> done) { - final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>(); - for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) { - familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath())); - } - - Token userToken = null; - if (userProvider.isHadoopSecurityEnabled()) { - userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken() - .getPassword().toByteArray(), new Text(request.getFsToken().getKind()), new Text( - request.getFsToken().getService())); - } - final String bulkToken = request.getBulkToken(); - User user = getActiveUser(); - final UserGroupInformation ugi = user.getUGI(); - if(userToken != null) { - ugi.addToken(userToken); - } else if (userProvider.isHadoopSecurityEnabled()) { - //we allow this to pass through in "simple" security mode - //for mini cluster testing - ResponseConverter.setControllerException(controller, - new DoNotRetryIOException("User token cannot be null")); - done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build()); - return; - } - - Region region = env.getRegion(); - boolean bypass = false; - if (region.getCoprocessorHost() != null) { - try { - bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths); - } catch (IOException e) { - ResponseConverter.setControllerException(controller, e); - done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build()); - return; - } - } + public void secureBulkLoadHFiles(RpcController controller, SecureBulkLoadHFilesRequest request, + RpcCallback<SecureBulkLoadHFilesResponse> done) { boolean loaded = false; - if (!bypass) { - // Get the target fs (HBase region server fs) delegation token - // Since we have checked the permission via 'preBulkLoadHFile', now let's give - // the 'request user' necessary token to operate on the target fs. - // After this point the 'doAs' user will hold two tokens, one for the source fs - // ('request user'), another for the target fs (HBase region server principal). - if (userProvider.isHadoopSecurityEnabled()) { - FsDelegationToken targetfsDelegationToken = new FsDelegationToken(userProvider, "renewer"); - try { - targetfsDelegationToken.acquireDelegationToken(fs); - } catch (IOException e) { - ResponseConverter.setControllerException(controller, e); - done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build()); - return; - } - Token<?> targetFsToken = targetfsDelegationToken.getUserToken(); - if (targetFsToken != null - && (userToken == null || !targetFsToken.getService().equals(userToken.getService()))) { - ugi.addToken(targetFsToken); - } - } - - loaded = ugi.doAs(new PrivilegedAction<Boolean>() { - @Override - public Boolean run() { - FileSystem fs = null; - try { - Configuration conf = env.getConfiguration(); - fs = FileSystem.get(conf); - for(Pair<byte[], String> el: familyPaths) { - Path stageFamily = new Path(bulkToken, Bytes.toString(el.getFirst())); - if(!fs.exists(stageFamily)) { - fs.mkdirs(stageFamily); - fs.setPermission(stageFamily, PERM_ALL_ACCESS); - } - } - //We call bulkLoadHFiles as requesting user - //To enable access prior to staging - return env.getRegion().bulkLoadHFiles(familyPaths, true, - new SecureBulkLoadListener(fs, bulkToken, conf)); - } catch (Exception e) { - LOG.error("Failed to complete bulk load", e); - } finally { - if (fs != null) { - try { - if (!UserGroupInformation.getLoginUser().equals(ugi)) { - FileSystem.closeAllForUGI(ugi); - } - } catch (IOException e) { - LOG.error("Failed to close FileSystem for " + ugi.getUserName(), e); - } - } - } - return false; - } - }); - } - if (region.getCoprocessorHost() != null) { - try { - loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded); - } catch (IOException e) { - ResponseConverter.setControllerException(controller, e); - done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(false).build()); - return; - } + try { + SecureBulkLoadManager secureBulkLoadManager = + this.env.getRegionServerServices().getSecureBulkLoadManager(); + BulkLoadHFileRequest bulkLoadHFileRequest = ConvertSecureBulkLoadHFilesRequest(request); + loaded = secureBulkLoadManager.secureBulkLoadHFiles(this.env.getRegion(), bulkLoadHFileRequest); + } catch (IOException e) { + ResponseConverter.setControllerException(controller, e); } done.run(SecureBulkLoadHFilesResponse.newBuilder().setLoaded(loaded).build()); } - private List<BulkLoadObserver> getBulkLoadObservers() { - List<BulkLoadObserver> coprocessorList = - this.env.getRegion().getCoprocessorHost().findCoprocessors(BulkLoadObserver.class); - - return coprocessorList; - } - - private Path createStagingDir(Path baseDir, - User user, - TableName tableName) throws IOException { - String tblName = tableName.getNameAsString().replace(":", "_"); - String randomDir = user.getShortName()+"__"+ tblName +"__"+ - (new BigInteger(RANDOM_WIDTH, random).toString(RANDOM_RADIX)); - return createStagingDir(baseDir, user, randomDir); - } - - private Path createStagingDir(Path baseDir, - User user, - String randomDir) throws IOException { - Path p = new Path(baseDir, randomDir); - fs.mkdirs(p, PERM_ALL_ACCESS); - fs.setPermission(p, PERM_ALL_ACCESS); - return p; - } - - private User getActiveUser() { - User user = RpcServer.getRequestUser(); - if (user == null) { - return null; - } - - //this is for testing - if (userProvider.isHadoopSecurityEnabled() - && "simple".equalsIgnoreCase(conf.get(User.HBASE_SECURITY_CONF_KEY))) { - return User.createUserForTesting(conf, user.getShortName(), new String[]{}); - } - - return user; + private BulkLoadHFileRequest ConvertSecureBulkLoadHFilesRequest( + SecureBulkLoadHFilesRequest request) { + BulkLoadHFileRequest.Builder bulkLoadHFileRequest = BulkLoadHFileRequest.newBuilder(); + RegionSpecifier region = + RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, this.env + .getRegionInfo().getRegionName()); + + bulkLoadHFileRequest.setRegion(region).setFsToken(request.getFsToken()) + .setBulkToken(request.getBulkToken()).setAssignSeqNum(request.getAssignSeqNum()) + .addAllFamilyPath(request.getFamilyPathList()); + return bulkLoadHFileRequest.build(); } @Override public Service getService() { return this; } - - private static class SecureBulkLoadListener implements BulkLoadListener { - // Target filesystem - private FileSystem fs; - private String stagingDir; - private Configuration conf; - // Source filesystem - private FileSystem srcFs = null; - private Map<String, FsPermission> origPermissions = null; - - public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) { - this.fs = fs; - this.stagingDir = stagingDir; - this.conf = conf; - this.origPermissions = new HashMap<String, FsPermission>(); - } - - @Override - public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException { - Path p = new Path(srcPath); - Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName())); - - // In case of Replication for bulk load files, hfiles are already copied in staging directory - if (p.equals(stageP)) { - LOG.debug(p.getName() - + " is already available in staging directory. Skipping copy or rename."); - return stageP.toString(); - } - - if (srcFs == null) { - srcFs = FileSystem.get(p.toUri(), conf); - } - - if(!isFile(p)) { - throw new IOException("Path does not reference a file: " + p); - } - - // Check to see if the source and target filesystems are the same - if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { - LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " + - "the destination filesystem. Copying file over to destination staging dir."); - FileUtil.copy(srcFs, p, fs, stageP, false, conf); - } else { - LOG.debug("Moving " + p + " to " + stageP); - FileStatus origFileStatus = fs.getFileStatus(p); - origPermissions.put(srcPath, origFileStatus.getPermission()); - if(!fs.rename(p, stageP)) { - throw new IOException("Failed to move HFile: " + p + " to " + stageP); - } - } - fs.setPermission(stageP, PERM_ALL_ACCESS); - return stageP.toString(); - } - - @Override - public void doneBulkLoad(byte[] family, String srcPath) throws IOException { - LOG.debug("Bulk Load done for: " + srcPath); - } - - @Override - public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException { - if (!FSHDFSUtils.isSameHdfs(conf, srcFs, fs)) { - // files are copied so no need to move them back - return; - } - Path p = new Path(srcPath); - Path stageP = new Path(stagingDir, - new Path(Bytes.toString(family), p.getName())); - - // In case of Replication for bulk load files, hfiles are not renamed by end point during - // prepare stage, so no need of rename here again - if (p.equals(stageP)) { - LOG.debug(p.getName() + " is already available in source directory. Skipping rename."); - return; - } - - LOG.debug("Moving " + stageP + " back to " + p); - if(!fs.rename(stageP, p)) - throw new IOException("Failed to move HFile: " + stageP + " to " + p); - - // restore original permission - if (origPermissions.containsKey(srcPath)) { - fs.setPermission(p, origPermissions.get(srcPath)); - } else { - LOG.warn("Can't find previous permission for path=" + srcPath); - } - } - - /** - * Check if the path is referencing a file. - * This is mainly needed to avoid symlinks. - * @param p - * @return true if the p is a file - * @throws IOException - */ - private boolean isFile(Path p) throws IOException { - FileStatus status = srcFs.getFileStatus(p); - boolean isFile = !status.isDirectory(); - try { - isFile = isFile && !(Boolean)Methods.call(FileStatus.class, status, "isSymlink", null, null); - } catch (Exception e) { - } - return isFile; - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index bfa14cb..a6dc59f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1080,6 +1080,10 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { // Now do the mini hbase cluster. Set the hbase.rootdir in config. createRootDir(create); + // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is + // for tests that do not read hbase-defaults.xml + setHBaseFsTmpDir(); + // These settings will make the server waits until this exact number of // regions servers are connected. if (conf.getInt(ServerManager.WAIT_ON_REGIONSERVERS_MINTOSTART, -1) == -1) { @@ -1104,10 +1108,6 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { getAdmin(); // create immediately the hbaseAdmin LOG.info("Minicluster is up"); - // Set the hbase.fs.tmp.dir config to make sure that we have some default value. This is - // for tests that do not read hbase-defaults.xml - setHBaseFsTmpDir(); - return (MiniHBaseCluster)this.hbaseCluster; } @@ -1278,6 +1278,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { } else { LOG.info("The hbase.fs.tmp.dir is set to " + hbaseFsTmpDirInString); } + this.conf.set("hbase.bulkload.staging.dir", this.conf.get("hbase.fs.tmp.dir")); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 6cd1963..6f225d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -316,7 +317,6 @@ public class MockRegionServerServices implements RegionServerServices { @Override public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub return null; } @@ -334,4 +334,9 @@ public class MockRegionServerServices implements RegionServerServices { public MetricsRegionServer getMetrics() { return null; } + + @Override + public SecureBulkLoadManager getSecureBulkLoadManager() { + return null; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java index 48d7efc..354f0a8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java @@ -35,8 +35,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; -import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -336,16 +334,21 @@ public class TestReplicaWithCluster { LOG.debug("Loading test data"); @SuppressWarnings("deprecation") final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection(); + table = conn.getTable(hdt.getTableName()); + final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn); RegionServerCallable<Void> callable = new RegionServerCallable<Void>( conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) { @Override public Void call(int timeout) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); + SecureBulkLoadClient secureClient = null; byte[] regionName = getLocation().getRegionInfo().getRegionName(); - BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); - getStub().bulkLoadHFile(null, request); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(table); + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + true, null, bulkToken); + } return null; } }; http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java deleted file mode 100644 index 11627a1..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesUseSecurityEndPoint.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.mapreduce; - -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.junit.BeforeClass; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class TestLoadIncrementalHFilesUseSecurityEndPoint extends TestLoadIncrementalHFiles { - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, - MAX_FILES_PER_REGION_PER_FAMILY); - util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, - "org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint"); - // change default behavior so that tag values are returned with normal rpcs - util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY, - KeyValueCodecWithTags.class.getCanonicalName()); - - util.startMiniCluster(); - setupNamespace(); - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 69f2e35..2927023 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodes import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest; @@ -90,6 +92,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; @@ -103,6 +107,7 @@ import org.apache.hadoop.hbase.regionserver.MetricsRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.util.Bytes; @@ -661,7 +666,6 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { @Override public ClusterConnection getClusterConnection() { - // TODO Auto-generated method stub return null; } @@ -679,4 +683,21 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public MetricsRegionServer getMetrics() { return null; } -} \ No newline at end of file + + @Override + public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, + PrepareBulkLoadRequest request) throws ServiceException { + return null; + } + + @Override + public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, + CleanupBulkLoadRequest request) throws ServiceException { + return null; + } + + @Override + public SecureBulkLoadManager getSecureBulkLoadManager() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java new file mode 100644 index 0000000..9ecc5d6 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadEndpointClient.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; +import org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos; +import org.apache.hadoop.hbase.security.SecureBulkLoadUtil; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.security.token.Token; + +/** + * Client proxy for SecureBulkLoadProtocol used in conjunction with SecureBulkLoadEndpoint + * @deprecated Use for backward compatibility testing only. Will be removed when + * SecureBulkLoadEndpoint is not supported. + */ +@InterfaceAudience.Private +public class SecureBulkLoadEndpointClient { + private Table table; + + public SecureBulkLoadEndpointClient(Table table) { + this.table = table; + } + + public String prepareBulkLoad(final TableName tableName) throws IOException { + try { + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + BlockingRpcCallback<PrepareBulkLoadResponse> rpcCallback = + new BlockingRpcCallback<PrepareBulkLoadResponse>(); + + PrepareBulkLoadRequest request = + PrepareBulkLoadRequest.newBuilder() + .setTableName(ProtobufUtil.toProtoTableName(tableName)).build(); + + instance.prepareBulkLoad(controller, + request, + rpcCallback); + + PrepareBulkLoadResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + + return response.getBulkToken(); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + public void cleanupBulkLoad(final String bulkToken) throws IOException { + try { + CoprocessorRpcChannel channel = table.coprocessorService(HConstants.EMPTY_START_ROW); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + ServerRpcController controller = new ServerRpcController(); + + BlockingRpcCallback<CleanupBulkLoadResponse> rpcCallback = + new BlockingRpcCallback<CleanupBulkLoadResponse>(); + + CleanupBulkLoadRequest request = + CleanupBulkLoadRequest.newBuilder() + .setBulkToken(bulkToken).build(); + + instance.cleanupBulkLoad(controller, + request, + rpcCallback); + + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths, + final Token<?> userToken, + final String bulkToken, + final byte[] startRow) throws IOException { + // we never want to send a batch of HFiles to all regions, thus cannot call + // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639 + try { + CoprocessorRpcChannel channel = table.coprocessorService(startRow); + SecureBulkLoadProtos.SecureBulkLoadService instance = + ProtobufUtil.newServiceStub(SecureBulkLoadProtos.SecureBulkLoadService.class, channel); + + DelegationToken protoDT = + DelegationToken.newBuilder().build(); + if(userToken != null) { + protoDT = + DelegationToken.newBuilder() + .setIdentifier(ByteStringer.wrap(userToken.getIdentifier())) + .setPassword(ByteStringer.wrap(userToken.getPassword())) + .setKind(userToken.getKind().toString()) + .setService(userToken.getService().toString()).build(); + } + + List<ClientProtos.BulkLoadHFileRequest.FamilyPath> protoFamilyPaths = + new ArrayList<ClientProtos.BulkLoadHFileRequest.FamilyPath>(); + for(Pair<byte[], String> el: familyPaths) { + protoFamilyPaths.add(ClientProtos.BulkLoadHFileRequest.FamilyPath.newBuilder() + .setFamily(ByteStringer.wrap(el.getFirst())) + .setPath(el.getSecond()).build()); + } + + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest request = + SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder() + .setFsToken(protoDT) + .addAllFamilyPath(protoFamilyPaths) + .setBulkToken(bulkToken).build(); + + ServerRpcController controller = new ServerRpcController(); + BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse> rpcCallback = + new BlockingRpcCallback<SecureBulkLoadProtos.SecureBulkLoadHFilesResponse>(); + instance.secureBulkLoadHFiles(controller, + request, + rpcCallback); + + SecureBulkLoadProtos.SecureBulkLoadHFilesResponse response = rpcCallback.get(); + if (controller.failedOnException()) { + throw controller.getFailedOn(); + } + return response.getLoaded(); + } catch (Throwable throwable) { + throw new IOException(throwable); + } + } + + public Path getStagingPath(String bulkToken, byte[] family) throws IOException { + return SecureBulkLoadUtil.getStagingPath(table.getConfiguration(), bulkToken, family); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/70f330dc/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java index bd5c91e..6e68201 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; @@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.RpcRetryingCaller; import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.SecureBulkLoadClient; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -91,15 +90,15 @@ import com.google.common.collect.Lists; @Category({RegionServerTests.class, LargeTests.class}) public class TestHRegionServerBulkLoad { private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class); - private static HBaseTestingUtility UTIL = new HBaseTestingUtility(); - private final static Configuration conf = UTIL.getConfiguration(); - private final static byte[] QUAL = Bytes.toBytes("qual"); - private final static int NUM_CFS = 10; + protected static HBaseTestingUtility UTIL = new HBaseTestingUtility(); + protected final static Configuration conf = UTIL.getConfiguration(); + protected final static byte[] QUAL = Bytes.toBytes("qual"); + protected final static int NUM_CFS = 10; private int sleepDuration; public static int BLOCKSIZE = 64 * 1024; public static Algorithm COMPRESSION = Compression.Algorithm.NONE; - private final static byte[][] families = new byte[NUM_CFS][]; + protected final static byte[][] families = new byte[NUM_CFS][]; static { for (int i = 0; i < NUM_CFS; i++) { families[i] = Bytes.toBytes(family(i)); @@ -200,16 +199,21 @@ public class TestHRegionServerBulkLoad { // bulk load HFiles final ClusterConnection conn = (ClusterConnection) UTIL.getAdmin().getConnection(); + Table table = conn.getTable(tableName); + final String bulkToken = new SecureBulkLoadClient(table).prepareBulkLoad(conn); RegionServerCallable<Void> callable = new RegionServerCallable<Void>(conn, tableName, Bytes.toBytes("aaa")) { @Override public Void call(int callTimeout) throws Exception { LOG.debug("Going to connect to server " + getLocation() + " for row " + Bytes.toStringBinary(getRow())); + SecureBulkLoadClient secureClient = null; byte[] regionName = getLocation().getRegionInfo().getRegionName(); - BulkLoadHFileRequest request = - RequestConverter.buildBulkLoadHFileRequest(famPaths, regionName, true); - getStub().bulkLoadHFile(null, request); + try (Table table = conn.getTable(getTableName())) { + secureClient = new SecureBulkLoadClient(table); + secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName, + true, null, bulkToken); + } return null; } }; @@ -320,7 +324,7 @@ public class TestHRegionServerBulkLoad { * Creates a table with given table name and specified number of column * families if the table does not already exist. */ - private void setupTable(TableName table, int cfs) throws IOException { + public void setupTable(TableName table, int cfs) throws IOException { try { LOG.info("Creating table " + table); HTableDescriptor htd = new HTableDescriptor(table);