Repository: hbase Updated Branches: refs/heads/master e450d94a2 -> 6ea499456
HBASE-15407 Add SASL support for fan out OutputStream Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6ea49945 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6ea49945 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6ea49945 Branch: refs/heads/master Commit: 6ea4994569e05ff44e0fa571e053cef828ab57ed Parents: e450d94 Author: zhangduo <zhang...@apache.org> Authored: Sun Mar 27 19:01:05 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Apr 8 21:46:47 2016 +0800 ---------------------------------------------------------------------- .../util/FanOutOneBlockAsyncDFSOutput.java | 38 +- .../FanOutOneBlockAsyncDFSOutputHelper.java | 230 ++-- .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 1032 ++++++++++++++++++ .../util/TestFanOutOneBlockAsyncDFSOutput.java | 13 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 192 ++++ 5 files changed, 1385 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java index b10f180..bdbf865 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutput.java @@ -17,11 +17,26 @@ */ package org.apache.hadoop.hbase.util; +import static io.netty.handler.timeout.IdleState.READER_IDLE; +import static io.netty.handler.timeout.IdleState.WRITER_IDLE; import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.HEART_BEAT_SEQNO; import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.completeFile; import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.endFileLease; import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.getStatus; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.EventLoop; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; import java.io.Closeable; import java.io.IOException; @@ -36,6 +51,8 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import com.google.common.base.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -52,23 +69,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.util.DataChecksum; -import com.google.common.base.Supplier; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoop; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - /** * An asynchronous HDFS output stream implementation which fans out data to datanode and only * supports writing file with only one block. @@ -278,7 +278,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; - if (e.state() == IdleState.READER_IDLE) { + if (e.state() == READER_IDLE) { failed(ctx.channel(), new Supplier<Throwable>() { @Override @@ -286,7 +286,7 @@ public class FanOutOneBlockAsyncDFSOutput implements Closeable { return new IOException("Timeout(" + timeoutMs + "ms) waiting for response"); } }); - } else if (e.state() == IdleState.WRITER_IDLE) { + } else if (e.state() == WRITER_IDLE) { PacketHeader heartbeat = new PacketHeader(4, 0, HEART_BEAT_SEQNO, false, 0, false); int len = heartbeat.getSerializedSize(); ByteBuf buf = alloc.buffer(len); http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java index 32fe48b..2225191 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputHelper.java @@ -18,12 +18,36 @@ package org.apache.hadoop.hbase.util; import static io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS; +import static io.netty.handler.timeout.IdleState.READER_IDLE; import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT; import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoop; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.Promise; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -33,6 +57,10 @@ import java.util.EnumSet; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.CodedOutputStream; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -68,46 +96,21 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.protobuf.CodedOutputStream; - -import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoop; -import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.protobuf.ProtobufDecoder; -import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; -import io.netty.handler.timeout.IdleState; -import io.netty.handler.timeout.IdleStateEvent; -import io.netty.handler.timeout.IdleStateHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.FutureListener; -import io.netty.util.concurrent.Promise; - /** * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}. */ @InterfaceAudience.Private -public class FanOutOneBlockAsyncDFSOutputHelper { +public final class FanOutOneBlockAsyncDFSOutputHelper { private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputHelper.class); @@ -167,6 +170,7 @@ public class FanOutOneBlockAsyncDFSOutputHelper { // This is used to terminate a recoverFileLease call when FileSystem is already closed. // isClientRunning is not public so we need to use reflection. private interface DFSClientAdaptor { + boolean isClientRunning(DFSClient client); } @@ -174,14 +178,14 @@ public class FanOutOneBlockAsyncDFSOutputHelper { private static DFSClientAdaptor createDFSClientAdaptor() { try { - final Method method = DFSClient.class.getDeclaredMethod("isClientRunning"); - method.setAccessible(true); + final Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning"); + isClientRunningMethod.setAccessible(true); return new DFSClientAdaptor() { @Override public boolean isClientRunning(DFSClient client) { try { - return (Boolean) method.invoke(client); + return (Boolean) isClientRunningMethod.invoke(client); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -194,11 +198,11 @@ public class FanOutOneBlockAsyncDFSOutputHelper { private static LeaseManager createLeaseManager() { try { - final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", - long.class, DFSOutputStream.class); + final Method beginFileLeaseMethod = + DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", - long.class); + final Method endFileLeaseMethod = + DFSClient.class.getDeclaredMethod("endFileLease", long.class); endFileLeaseMethod.setAccessible(true); return new LeaseManager() { @@ -224,11 +228,11 @@ public class FanOutOneBlockAsyncDFSOutputHelper { LOG.warn("No inodeId related lease methods found, should be hadoop 2.4-", e); } try { - final Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", - String.class, DFSOutputStream.class); + final Method beginFileLeaseMethod = + DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class); beginFileLeaseMethod.setAccessible(true); - final Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", - String.class); + final Method endFileLeaseMethod = + DFSClient.class.getDeclaredMethod("endFileLease", String.class); endFileLeaseMethod.setAccessible(true); return new LeaseManager() { @@ -261,18 +265,19 @@ public class FanOutOneBlockAsyncDFSOutputHelper { @SuppressWarnings("rawtypes") Class<? extends Enum> ecnClass; try { - ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") - .asSubclass(Enum.class); + ecnClass = + Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN") + .asSubclass(Enum.class); } catch (ClassNotFoundException e) { throw new Error(e); } @SuppressWarnings("unchecked") final Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED"); final Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class); - final Method combineHeaderMethod = PipelineAck.class.getMethod("combineHeader", ecnClass, - Status.class); - final Method getStatusFromHeaderMethod = PipelineAck.class.getMethod("getStatusFromHeader", - int.class); + final Method combineHeaderMethod = + PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class); + final Method getStatusFromHeaderMethod = + PipelineAck.class.getMethod("getStatusFromHeader", int.class); return new PipelineAckStatusGetter() { @Override @@ -317,8 +322,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper { private static StorageTypeSetter createStorageTypeSetter() { final Method setStorageTypeMethod; try { - setStorageTypeMethod = OpWriteBlockProto.Builder.class.getMethod("setStorageType", - StorageTypeProto.class); + setStorageTypeMethod = + OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class); } catch (NoSuchMethodException e) { LOG.warn("noSetStorageType method found, should be hadoop 2.5-", e); return new StorageTypeSetter() { @@ -362,8 +367,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper { String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize) throws IOException { try { - return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, - createParent, replication, blockSize); + return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, + flag, createParent, replication, blockSize); } catch (IllegalAccessException e) { throw new RuntimeException(e); } catch (InvocationTargetException e) { @@ -374,16 +379,16 @@ public class FanOutOneBlockAsyncDFSOutputHelper { }; } else { try { - Class<?> cryptoProtocolVersionClass = Class - .forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); + Class<?> cryptoProtocolVersionClass = + Class.forName("org.apache.hadoop.crypto.CryptoProtocolVersion"); Method supportedMethod = cryptoProtocolVersionClass.getMethod("supported"); final Object supported = supportedMethod.invoke(null); return new FileCreater() { @Override - public HdfsFileStatus create(ClientProtocol namenode, String src, FsPermission masked, - String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent, - short replication, long blockSize) throws IOException { + public HdfsFileStatus create(ClientProtocol namenode, String src, + FsPermission masked, String clientName, EnumSetWritable<CreateFlag> flag, + boolean createParent, short replication, long blockSize) throws IOException { try { return (HdfsFileStatus) createMethod.invoke(namenode, src, masked, clientName, flag, createParent, replication, blockSize, supported); @@ -481,8 +486,12 @@ public class FanOutOneBlockAsyncDFSOutputHelper { } // success ChannelPipeline p = ctx.pipeline(); - while (p.first() != null) { - p.removeFirst(); + for (ChannelHandler handler; (handler = p.removeLast()) != null;) { + // do not remove all handlers because we may have wrap or unwrap handlers at the header + // of pipeline. + if (handler instanceof IdleStateHandler) { + break; + } } // Disable auto read here. Enable it after we setup the streaming pipeline in // FanOutOneBLockAsyncDFSOutput. @@ -497,8 +506,7 @@ public class FanOutOneBlockAsyncDFSOutputHelper { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - if (evt instanceof IdleStateEvent - && ((IdleStateEvent) evt).state() == IdleState.READER_IDLE) { + if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { promise .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); } else { @@ -515,39 +523,64 @@ public class FanOutOneBlockAsyncDFSOutputHelper { private static void requestWriteBlock(Channel channel, Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException { - // TODO: SASL negotiation. should be done using a netty Handler. OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build(); int protoLen = proto.getSerializedSize(); - ByteBuf buffer = channel.alloc() - .buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); + ByteBuf buffer = + channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen); buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); buffer.writeByte(Op.WRITE_BLOCK.code); proto.writeDelimitedTo(new ByteBufOutputStream(buffer)); channel.writeAndFlush(buffer); } - private static List<Future<Channel>> connectToDataNodes(Configuration conf, String clientName, - LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS, BlockConstructionStage stage, - DataChecksum summer, EventLoop eventLoop) { + private static void initialize(Configuration conf, final Channel channel, + final DatanodeInfo dnInfo, final Enum<?> storageType, + final OpWriteBlockProto.Builder writeBlockProtoBuilder, final int timeoutMs, + DFSClient client, Token<BlockTokenIdentifier> accessToken, final Promise<Channel> promise) { + Promise<Void> saslPromise = channel.eventLoop().newPromise(); + trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise); + saslPromise.addListener(new FutureListener<Void>() { + + @Override + public void operationComplete(Future<Void> future) throws Exception { + if (future.isSuccess()) { + // setup response processing pipeline first, then send request. + processWriteBlockResponse(channel, dnInfo, promise, timeoutMs); + requestWriteBlock(channel, storageType, writeBlockProtoBuilder); + } else { + promise.tryFailure(future.cause()); + } + } + }); + } + + private static List<Future<Channel>> connectToDataNodes(final Configuration conf, + final DFSClient client, String clientName, final LocatedBlock locatedBlock, + long maxBytesRcvd, long latestGS, BlockConstructionStage stage, DataChecksum summer, + EventLoop eventLoop) { Enum<?>[] storageTypes = locatedBlock.getStorageTypes(); DatanodeInfo[] datanodeInfos = locatedBlock.getLocations(); - boolean connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, - DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); - final int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, - HdfsServerConstants.READ_TIMEOUT); + boolean connectToDnViaHostname = + conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); + final int timeoutMs = + conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, HdfsServerConstants.READ_TIMEOUT); ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock()); blockCopy.setNumBytes(locatedBlock.getBlockSize()); - ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder() - .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy)) - .setToken(PBHelper.convert(locatedBlock.getBlockToken()))) - .setClientName(clientName).build(); + ClientOperationHeaderProto header = + ClientOperationHeaderProto + .newBuilder() + .setBaseHeader( + BaseHeaderProto.newBuilder().setBlock(PBHelper.convert(blockCopy)) + .setToken(PBHelper.convert(locatedBlock.getBlockToken()))) + .setClientName(clientName).build(); ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer); - final OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder() - .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) - .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) - .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) - .setRequestedChecksum(checksumProto) - .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); + final OpWriteBlockProto.Builder writeBlockProtoBuilder = + OpWriteBlockProto.newBuilder().setHeader(header) + .setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name())) + .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes()) + .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS) + .setRequestedChecksum(checksumProto) + .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build()); List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length); for (int i = 0; i < datanodeInfos.length; i++) { final DatanodeInfo dnInfo = datanodeInfos[i]; @@ -562,14 +595,17 @@ public class FanOutOneBlockAsyncDFSOutputHelper { @Override protected void initChannel(Channel ch) throws Exception { - processWriteBlockResponse(ch, dnInfo, promise, timeoutMs); + // we need to get the remote address of the channel so we can only move on after + // channel connected. Leave an empty implementation here because netty does not allow + // a null handler. } }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - requestWriteBlock(future.channel(), storageType, writeBlockProtoBuilder); + initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder, + timeoutMs, client, locatedBlock.getBlockToken(), promise); } else { promise.tryFailure(future.cause()); } @@ -601,11 +637,14 @@ public class FanOutOneBlockAsyncDFSOutputHelper { ClientProtocol namenode = client.getNamenode(); HdfsFileStatus stat; try { - stat = FILE_CREATER.create(namenode, src, - FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - new EnumSetWritable<CreateFlag>( - overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)), - createParent, replication, blockSize); + stat = + FILE_CREATER.create( + namenode, + src, + FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), + clientName, + new EnumSetWritable<CreateFlag>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet + .of(CREATE)), createParent, replication, blockSize); } catch (Exception e) { if (e instanceof RemoteException) { throw (RemoteException) e; @@ -619,19 +658,20 @@ public class FanOutOneBlockAsyncDFSOutputHelper { List<Future<Channel>> futureList = null; try { DataChecksum summer = createChecksum(client); - locatedBlock = namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), - null); + locatedBlock = + namenode.addBlock(src, client.getClientName(), null, null, stat.getFileId(), null); List<Channel> datanodeList = new ArrayList<>(); - futureList = connectToDataNodes(conf, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, - summer, eventLoop); + futureList = + connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L, PIPELINE_SETUP_CREATE, + summer, eventLoop); for (Future<Channel> future : futureList) { // fail the creation if there are connection failures since we are fail-fast. The upper // layer should retry itself if needed. datanodeList.add(future.syncUninterruptibly().getNow()); } succ = true; - return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src, - stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); + return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, + src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, summer, ALLOC); } finally { if (!succ) { if (futureList != null) { @@ -664,8 +704,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper { return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() { @Override - public FanOutOneBlockAsyncDFSOutput doCall(Path p) - throws IOException, UnresolvedLinkException { + public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, + UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, blockSize, eventLoop); } @@ -684,8 +724,8 @@ public class FanOutOneBlockAsyncDFSOutputHelper { return e.getClassName().endsWith("RetryStartFileException"); } - static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, - ExtendedBlock block, long fileId) { + static void completeFile(DFSClient client, ClientProtocol namenode, String src, + String clientName, ExtendedBlock block, long fileId) { for (int retry = 0;; retry++) { try { if (namenode.complete(src, clientName, block, fileId)) { http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java new file mode 100644 index 0000000..341d4ec --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FanOutOneBlockAsyncDFSOutputSaslHelper.java @@ -0,0 +1,1032 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static io.netty.handler.timeout.IdleState.READER_IDLE; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.codec.protobuf.ProtobufDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.concurrent.Promise; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.CallbackHandler; +import javax.security.auth.callback.NameCallback; +import javax.security.auth.callback.PasswordCallback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.sasl.RealmCallback; +import javax.security.sasl.RealmChoiceCallback; +import javax.security.sasl.Sasl; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedOutputStream; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; +import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.Builder; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.security.SaslPropertiesResolver; +import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; + +/** + * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}. + */ +@InterfaceAudience.Private +public final class FanOutOneBlockAsyncDFSOutputSaslHelper { + + private static final Log LOG = LogFactory.getLog(FanOutOneBlockAsyncDFSOutputSaslHelper.class); + + private FanOutOneBlockAsyncDFSOutputSaslHelper() { + } + + private static final String SERVER_NAME = "0"; + private static final String PROTOCOL = "hdfs"; + private static final String MECHANISM = "DIGEST-MD5"; + private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF; + private static final String NAME_DELIMITER = " "; + private static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY = + "dfs.encrypt.data.transfer.cipher.suites"; + private static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding"; + + private interface SaslAdaptor { + + SaslPropertiesResolver getSaslPropsResolver(DFSClient client); + + TrustedChannelResolver getTrustedChannelResolver(DFSClient client); + + AtomicBoolean getFallbackToSimpleAuth(DFSClient client); + + DataEncryptionKey createDataEncryptionKey(DFSClient client); + } + + private static final SaslAdaptor SASL_ADAPTOR; + + private interface CipherHelper { + + List<Object> getCipherOptions(Configuration conf) throws IOException; + + void addCipherOptions(DataTransferEncryptorMessageProto.Builder builder, + List<Object> cipherOptions); + + Object getCipherOption(DataTransferEncryptorMessageProto proto, boolean isNegotiatedQopPrivacy, + SaslClient saslClient) throws IOException; + + Object getCipherSuite(Object cipherOption); + + byte[] getInKey(Object cipherOption); + + byte[] getInIv(Object cipherOption); + + byte[] getOutKey(Object cipherOption); + + byte[] getOutIv(Object cipherOption); + } + + private static final CipherHelper CIPHER_HELPER; + + private static final class CryptoCodec { + + private static final Method CREATE_CODEC; + + private static final Method CREATE_ENCRYPTOR; + + private static final Method CREATE_DECRYPTOR; + + private static final Method INIT_ENCRYPTOR; + + private static final Method INIT_DECRYPTOR; + + private static final Method ENCRYPT; + + private static final Method DECRYPT; + + static { + Class<?> cryptoCodecClass = null; + try { + cryptoCodecClass = Class.forName("org.apache.hadoop.crypto.CryptoCodec"); + } catch (ClassNotFoundException e) { + LOG.warn("No CryptoCodec class found, should be hadoop 2.5-", e); + } + if (cryptoCodecClass != null) { + Method getInstanceMethod = null; + for (Method method : cryptoCodecClass.getMethods()) { + if (method.getName().equals("getInstance") && method.getParameterTypes().length == 2) { + getInstanceMethod = method; + break; + } + } + CREATE_CODEC = getInstanceMethod; + try { + CREATE_ENCRYPTOR = cryptoCodecClass.getMethod("createEncryptor"); + CREATE_DECRYPTOR = cryptoCodecClass.getMethod("createDecryptor"); + + Class<?> encryptorClass = Class.forName("org.apache.hadoop.crypto.Encryptor"); + INIT_ENCRYPTOR = encryptorClass.getMethod("init"); + ENCRYPT = encryptorClass.getMethod("encrypt", ByteBuffer.class, ByteBuffer.class); + + Class<?> decryptorClass = Class.forName("org.apache.hadoop.crypto.Decryptor"); + INIT_DECRYPTOR = decryptorClass.getMethod("init"); + DECRYPT = decryptorClass.getMethod("decrypt", ByteBuffer.class, ByteBuffer.class); + } catch (NoSuchMethodException | ClassNotFoundException e) { + throw new Error(e); + } + } else { + LOG.warn("Can not initialize CryptoCodec, should be hadoop 2.5-"); + CREATE_CODEC = null; + CREATE_ENCRYPTOR = null; + CREATE_DECRYPTOR = null; + INIT_ENCRYPTOR = null; + INIT_DECRYPTOR = null; + ENCRYPT = null; + DECRYPT = null; + } + } + + private final Object encryptor; + + private final Object decryptor; + + public CryptoCodec(Configuration conf, Object cipherOption) { + Object codec; + try { + codec = CREATE_CODEC.invoke(null, conf, CIPHER_HELPER.getCipherSuite(cipherOption)); + encryptor = CREATE_ENCRYPTOR.invoke(codec); + byte[] encKey = CIPHER_HELPER.getInKey(cipherOption); + byte[] encIv = CIPHER_HELPER.getInIv(cipherOption); + INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, encIv.length)); + + decryptor = CREATE_DECRYPTOR.invoke(codec); + byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption); + byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption); + INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, decIv.length)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { + try { + ENCRYPT.invoke(encryptor, inBuffer, outBuffer); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) { + try { + DECRYPT.invoke(decryptor, inBuffer, outBuffer); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + } + + private static SaslAdaptor createSaslAdaptor27(Class<?> saslDataTransferClientClass) + throws NoSuchFieldException, NoSuchMethodException { + final Field saslPropsResolverField = + saslDataTransferClientClass.getDeclaredField("saslPropsResolver"); + saslPropsResolverField.setAccessible(true); + final Field trustedChannelResolverField = + saslDataTransferClientClass.getDeclaredField("trustedChannelResolver"); + trustedChannelResolverField.setAccessible(true); + final Field fallbackToSimpleAuthField = + saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth"); + fallbackToSimpleAuthField.setAccessible(true); + final Method getSaslDataTransferClientMethod = + DFSClient.class.getMethod("getSaslDataTransferClient"); + final Method newDataEncryptionKeyMethod = DFSClient.class.getMethod("newDataEncryptionKey"); + return new SaslAdaptor() { + + @Override + public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + try { + return (TrustedChannelResolver) trustedChannelResolverField + .get(getSaslDataTransferClientMethod.invoke(client)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { + try { + return (SaslPropertiesResolver) saslPropsResolverField + .get(getSaslDataTransferClientMethod.invoke(client)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + try { + return (AtomicBoolean) fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod + .invoke(client)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public DataEncryptionKey createDataEncryptionKey(DFSClient client) { + try { + return (DataEncryptionKey) newDataEncryptionKeyMethod.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static SaslAdaptor createSaslAdaptor25() { + try { + final Field trustedChannelResolverField = + DFSClient.class.getDeclaredField("trustedChannelResolver"); + trustedChannelResolverField.setAccessible(true); + final Method getDataEncryptionKeyMethod = DFSClient.class.getMethod("getDataEncryptionKey"); + return new SaslAdaptor() { + + @Override + public TrustedChannelResolver getTrustedChannelResolver(DFSClient client) { + try { + return (TrustedChannelResolver) trustedChannelResolverField.get(client); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + + @Override + public SaslPropertiesResolver getSaslPropsResolver(DFSClient client) { + return null; + } + + @Override + public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) { + return null; + } + + @Override + public DataEncryptionKey createDataEncryptionKey(DFSClient client) { + try { + return (DataEncryptionKey) getDataEncryptionKeyMethod.invoke(client); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } catch (NoSuchFieldException | NoSuchMethodException e) { + throw new Error(e); + } + + } + + private static SaslAdaptor createSaslAdaptor() { + Class<?> saslDataTransferClientClass = null; + try { + saslDataTransferClientClass = + Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient"); + } catch (ClassNotFoundException e) { + LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-"); + } + try { + return saslDataTransferClientClass != null ? createSaslAdaptor27(saslDataTransferClientClass) + : createSaslAdaptor25(); + } catch (NoSuchFieldException | NoSuchMethodException e) { + throw new Error(e); + } + } + + private static CipherHelper createCipherHelper25() { + return new CipherHelper() { + + @Override + public byte[] getOutKey(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getOutIv(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getInKey(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] getInIv(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public Object getCipherSuite(Object cipherOption) { + throw new UnsupportedOperationException(); + } + + @Override + public List<Object> getCipherOptions(Configuration conf) { + return null; + } + + @Override + public Object getCipherOption(DataTransferEncryptorMessageProto proto, + boolean isNegotiatedQopPrivacy, SaslClient saslClient) { + return null; + } + + @Override + public void addCipherOptions(Builder builder, List<Object> cipherOptions) { + throw new UnsupportedOperationException(); + } + }; + } + + private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass) + throws ClassNotFoundException, NoSuchMethodException { + @SuppressWarnings("rawtypes") + Class<? extends Enum> cipherSuiteClass = + Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class); + @SuppressWarnings("unchecked") + final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, "AES_CTR_NOPADDING"); + final Constructor<?> cipherOptionConstructor = + cipherOptionClass.getConstructor(cipherSuiteClass); + final Constructor<?> cipherOptionWithKeyAndIvConstructor = + cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, byte[].class, + byte[].class, byte[].class); + + final Method getCipherSuiteMethod = cipherOptionClass.getMethod("getCipherSuite"); + final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey"); + final Method getInIvMethod = cipherOptionClass.getMethod("getInIv"); + final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey"); + final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv"); + + final Method convertCipherOptionsMethod = + PBHelper.class.getMethod("convertCipherOptions", List.class); + final Method convertCipherOptionProtosMethod = + PBHelper.class.getMethod("convertCipherOptionProtos", List.class); + final Method addAllCipherOptionMethod = + DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption", + Iterable.class); + final Method getCipherOptionListMethod = + DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList"); + return new CipherHelper() { + + @Override + public byte[] getOutKey(Object cipherOption) { + try { + return (byte[]) getOutKeyMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] getOutIv(Object cipherOption) { + try { + return (byte[]) getOutIvMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] getInKey(Object cipherOption) { + try { + return (byte[]) getInKeyMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public byte[] getInIv(Object cipherOption) { + try { + return (byte[]) getInIvMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object getCipherSuite(Object cipherOption) { + try { + return getCipherSuiteMethod.invoke(cipherOption); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public List<Object> getCipherOptions(Configuration conf) throws IOException { + // Negotiate cipher suites if configured. Currently, the only supported + // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple + // values for future expansion. + String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY); + if (cipherSuites == null || cipherSuites.isEmpty()) { + return null; + } + if (!cipherSuites.equals(AES_CTR_NOPADDING)) { + throw new IOException(String.format("Invalid cipher suite, %s=%s", + DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites)); + } + Object option; + try { + option = cipherOptionConstructor.newInstance(aesCipherSuite); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + List<Object> cipherOptions = Lists.newArrayListWithCapacity(1); + cipherOptions.add(option); + return cipherOptions; + } + + private Object unwrap(Object option, SaslClient saslClient) throws IOException { + byte[] inKey = getInKey(option); + if (inKey != null) { + inKey = saslClient.unwrap(inKey, 0, inKey.length); + } + byte[] outKey = getOutKey(option); + if (outKey != null) { + outKey = saslClient.unwrap(outKey, 0, outKey.length); + } + try { + return cipherOptionWithKeyAndIvConstructor.newInstance(getCipherSuite(option), inKey, + getInIv(option), outKey, getOutIv(option)); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("unchecked") + @Override + public Object getCipherOption(DataTransferEncryptorMessageProto proto, + boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException { + List<Object> cipherOptions; + try { + cipherOptions = + (List<Object>) convertCipherOptionProtosMethod.invoke(null, + getCipherOptionListMethod.invoke(proto)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + if (cipherOptions == null || cipherOptions.isEmpty()) { + return null; + } + Object cipherOption = cipherOptions.get(0); + return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption; + } + + @Override + public void addCipherOptions(Builder builder, List<Object> cipherOptions) { + try { + addAllCipherOptionMethod.invoke(builder, + convertCipherOptionsMethod.invoke(null, cipherOptions)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static CipherHelper createCipherHelper() { + Class<?> cipherOptionClass; + try { + cipherOptionClass = Class.forName("org.apache.hadoop.crypto.CipherOption"); + } catch (ClassNotFoundException e) { + LOG.warn("No CipherOption class found, should be hadoop 2.5-"); + return createCipherHelper25(); + } + try { + return createCipherHelper27(cipherOptionClass); + } catch (NoSuchMethodException | ClassNotFoundException e) { + throw new Error(e); + } + } + + static { + SASL_ADAPTOR = createSaslAdaptor(); + CIPHER_HELPER = createCipherHelper(); + } + + /** + * Sets user name and password when asked by the client-side SASL object. + */ + private static final class SaslClientCallbackHandler implements CallbackHandler { + + private final char[] password; + private final String userName; + + /** + * Creates a new SaslClientCallbackHandler. + * @param userName SASL user name + * @Param password SASL password + */ + public SaslClientCallbackHandler(String userName, char[] password) { + this.password = password; + this.userName = userName; + } + + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + NameCallback nc = null; + PasswordCallback pc = null; + RealmCallback rc = null; + for (Callback callback : callbacks) { + if (callback instanceof RealmChoiceCallback) { + continue; + } else if (callback instanceof NameCallback) { + nc = (NameCallback) callback; + } else if (callback instanceof PasswordCallback) { + pc = (PasswordCallback) callback; + } else if (callback instanceof RealmCallback) { + rc = (RealmCallback) callback; + } else { + throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); + } + } + if (nc != null) { + nc.setName(userName); + } + if (pc != null) { + pc.setPassword(password); + } + if (rc != null) { + rc.setText(rc.getDefaultText()); + } + } + } + + private static final class SaslNegotiateHandler extends ChannelDuplexHandler { + + private final Configuration conf; + + private final Map<String, String> saslProps; + + private final SaslClient saslClient; + + private final int timeoutMs; + + private final Promise<Void> promise; + + private int step = 0; + + public SaslNegotiateHandler(Configuration conf, String username, char[] password, + Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) throws SaslException { + this.conf = conf; + this.saslProps = saslProps; + this.saslClient = + Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL, SERVER_NAME, + saslProps, new SaslClientCallbackHandler(username, password)); + this.timeoutMs = timeoutMs; + this.promise = promise; + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException { + sendSaslMessage(ctx, payload, null); + } + + private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, List<Object> options) + throws IOException { + DataTransferEncryptorMessageProto.Builder builder = + DataTransferEncryptorMessageProto.newBuilder(); + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); + if (payload != null) { + builder.setPayload(ByteString.copyFrom(payload)); + } + if (options != null) { + CIPHER_HELPER.addCipherOptions(builder, options); + } + DataTransferEncryptorMessageProto proto = builder.build(); + int size = proto.getSerializedSize(); + size += CodedOutputStream.computeRawVarint32Size(size); + ByteBuf buf = ctx.alloc().buffer(size); + proto.writeDelimitedTo(new ByteBufOutputStream(buf)); + ctx.write(buf); + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER)); + sendSaslMessage(ctx, new byte[0]); + ctx.flush(); + step++; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslClient.dispose(); + } + + private void check(DataTransferEncryptorMessageProto proto) throws IOException { + if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { + throw new InvalidEncryptionKeyException(proto.getMessage()); + } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) { + throw new IOException(proto.getMessage()); + } + } + + private String getNegotiatedQop() { + return (String) saslClient.getNegotiatedProperty(Sasl.QOP); + } + + private boolean isNegotiatedQopPrivacy() { + String qop = getNegotiatedQop(); + return qop != null && "auth-conf".equalsIgnoreCase(qop); + } + + private boolean requestedQopContainsPrivacy() { + Set<String> requestedQop = + ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + return requestedQop.contains("auth-conf"); + } + + private void checkSaslComplete() throws IOException { + if (!saslClient.isComplete()) { + throw new IOException("Failed to complete SASL handshake"); + } + Set<String> requestedQop = + ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(","))); + String negotiatedQop = getNegotiatedQop(); + LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + + negotiatedQop); + if (!requestedQop.contains(negotiatedQop)) { + throw new IOException(String.format("SASL handshake completed, but " + + "channel does not have acceptable quality of protection, " + + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + } + } + + private boolean useWrap() { + String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP); + return qop != null && !"auth".equalsIgnoreCase(qop); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { + if (msg instanceof DataTransferEncryptorMessageProto) { + DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg; + check(proto); + byte[] challenge = proto.getPayload().toByteArray(); + byte[] response = saslClient.evaluateChallenge(challenge); + switch (step) { + case 1: { + List<Object> cipherOptions = null; + if (requestedQopContainsPrivacy()) { + cipherOptions = CIPHER_HELPER.getCipherOptions(conf); + } + sendSaslMessage(ctx, response, cipherOptions); + ctx.flush(); + step++; + break; + } + case 2: { + assert response == null; + checkSaslComplete(); + Object cipherOption = + CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient); + ChannelPipeline p = ctx.pipeline(); + while (p.first() != null) { + p.removeFirst(); + } + if (cipherOption != null) { + CryptoCodec codec = new CryptoCodec(conf, cipherOption); + p.addLast(new EncryptHandler(codec), new DecryptHandler(codec)); + } else { + if (useWrap()) { + p.addLast(new SaslWrapHandler(saslClient), new LengthFieldBasedFrameDecoder( + Integer.MAX_VALUE, 0, 4), new SaslUnwrapHandler(saslClient)); + } + } + promise.trySuccess(null); + break; + } + default: + throw new IllegalArgumentException("Unrecognized negotiation step: " + step); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + promise.tryFailure(cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) { + promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response")); + } else { + super.userEventTriggered(ctx, evt); + } + } + } + + private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final SaslClient saslClient; + + public SaslUnwrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + saslClient.dispose(); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + msg.skipBytes(4); + byte[] b = new byte[msg.readableBytes()]; + msg.readBytes(b); + ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length))); + } + } + + private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter { + + private final SaslClient saslClient; + + private CompositeByteBuf cBuf; + + public SaslWrapHandler(SaslClient saslClient) { + this.saslClient = saslClient; + } + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + cBuf.addComponent(buf); + cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes()); + } else { + ctx.write(msg); + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (cBuf.isReadable()) { + byte[] b = new byte[cBuf.readableBytes()]; + cBuf.readBytes(b); + cBuf.discardReadComponents(); + byte[] wrapped = saslClient.wrap(b, 0, b.length); + ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length); + buf.writeInt(wrapped.length); + buf.writeBytes(wrapped); + ctx.write(buf); + } + ctx.flush(); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + cBuf.release(); + cBuf = null; + } + } + + private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> { + + private final CryptoCodec codec; + + public DecryptHandler(CryptoCodec codec) { + this.codec = codec; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + ByteBuf inBuf; + boolean release = false; + if (msg.nioBufferCount() == 1) { + inBuf = msg; + } else { + inBuf = ctx.alloc().directBuffer(msg.readableBytes()); + msg.readBytes(inBuf); + release = true; + } + ByteBuffer inBuffer = inBuf.nioBuffer(); + ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes()); + ByteBuffer outBuffer = outBuf.nioBuffer(); + codec.decrypt(inBuffer, outBuffer); + outBuf.writerIndex(inBuf.readableBytes()); + if (release) { + inBuf.release(); + } + ctx.fireChannelRead(outBuf); + } + } + + private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> { + + private final CryptoCodec codec; + + public EncryptHandler(CryptoCodec codec) { + super(false); + this.codec = codec; + } + + @Override + protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect) + throws Exception { + if (preferDirect) { + return ctx.alloc().directBuffer(msg.readableBytes()); + } else { + return ctx.alloc().buffer(msg.readableBytes()); + } + } + + @Override + protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception { + ByteBuf inBuf; + boolean release = false; + if (msg.nioBufferCount() == 1) { + inBuf = msg; + } else { + inBuf = ctx.alloc().directBuffer(msg.readableBytes()); + msg.readBytes(inBuf); + release = true; + } + ByteBuffer inBuffer = inBuf.nioBuffer(); + ByteBuffer outBuffer = out.nioBuffer(); + codec.encrypt(inBuffer, outBuffer); + out.writerIndex(inBuf.readableBytes()); + if (release) { + inBuf.release(); + } + } + } + + private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) { + return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER + + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); + } + + private static char[] encryptionKeyToPassword(byte[] encryptionKey) { + return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray(); + } + + private static String buildUsername(Token<BlockTokenIdentifier> blockToken) { + return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8); + } + + private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { + return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8) + .toCharArray(); + } + + private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) { + Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3); + saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop()); + saslProps.put(Sasl.SERVER_AUTH, "true"); + saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm); + return saslProps; + } + + private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs, + String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise) { + try { + channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS), + new ProtobufVarint32FrameDecoder(), + new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()), + new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise)); + } catch (SaslException e) { + saslPromise.tryFailure(e); + } + } + + static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo, + int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken, + Promise<Void> saslPromise) { + SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(client); + TrustedChannelResolver trustedChannelResolver = SASL_ADAPTOR.getTrustedChannelResolver(client); + AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(client); + InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress(); + if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) { + saslPromise.trySuccess(null); + return; + } + DataEncryptionKey encryptionKey; + try { + encryptionKey = SASL_ADAPTOR.createDataEncryptionKey(client); + } catch (Exception e) { + saslPromise.tryFailure(e); + return; + } + if (encryptionKey != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + + dnInfo); + } + doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey), + encryptionKeyToPassword(encryptionKey.encryptionKey), + createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise); + } else if (!UserGroupInformation.isSecurityEnabled()) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr + + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } else if (dnInfo.getXferPort() < 1024) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in secured configuration with " + + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in secured configuration with " + + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } else if (saslPropsResolver != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client doing general handshake for addr = " + addr + ", datanodeId = " + + dnInfo); + } + doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken), + buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise); + } else { + // It's a secured cluster using non-privileged ports, but no SASL. The only way this can + // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare + // edge case. + if (LOG.isDebugEnabled()) { + LOG.debug("SASL client skipping handshake in secured configuration with no SASL " + + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo); + } + saslPromise.trySuccess(null); + } + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java index 09cd61e..a10712e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFanOutOneBlockAsyncDFSOutput.java @@ -102,8 +102,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { } } - private void writeAndVerify(EventLoop eventLoop, Path f, final FanOutOneBlockAsyncDFSOutput out) - throws IOException, InterruptedException, ExecutionException { + static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, Path f, + final FanOutOneBlockAsyncDFSOutput out) + throws IOException, InterruptedException, ExecutionException { final byte[] b = new byte[10]; ThreadLocalRandom.current().nextBytes(b); final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new FanOutOneBlockAsyncDFSOutputFlushHandler(); @@ -117,9 +118,9 @@ public class TestFanOutOneBlockAsyncDFSOutput { }); assertEquals(b.length, handler.get()); out.close(); - assertEquals(b.length, FS.getFileStatus(f).getLen()); + assertEquals(b.length, dfs.getFileStatus(f).getLen()); byte[] actual = new byte[b.length]; - try (FSDataInputStream in = FS.open(f)) { + try (FSDataInputStream in = dfs.open(f)) { in.readFully(actual); } assertArrayEquals(b, actual); @@ -131,7 +132,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); - writeAndVerify(eventLoop, f, out); + writeAndVerify(eventLoop, FS, f, out); } @Test @@ -191,7 +192,7 @@ public class TestFanOutOneBlockAsyncDFSOutput { true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. - writeAndVerify(eventLoop, f, out); + writeAndVerify(eventLoop, FS, f, out); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/6ea49945/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java new file mode 100644 index 0000000..2f5e2ff --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil; +import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.http.HttpConfig; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; + +@RunWith(Parameterized.class) +@Category({ MiscTests.class, MediumTests.class }) +public class TestSaslFanOutOneBlockAsyncDFSOutput { + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static DistributedFileSystem FS; + + private static EventLoopGroup EVENT_LOOP_GROUP; + + private static int READ_TIMEOUT_MS = 200000; + + private static final File KEYTAB_FILE = new File( + TEST_UTIL.getDataTestDir("keytab").toUri().getPath()); + + private static MiniKdc KDC; + + private static String HOST = "localhost"; + + private static String USERNAME; + + private static String PRINCIPAL; + + private static String HTTP_PRINCIPAL; + @Rule + public TestName name = new TestName(); + + @Parameter(0) + public String protection; + + @Parameter(1) + public String encryptionAlgorithm; + + @Parameters(name = "{index}: protection={0}, encryption={1}") + public static Iterable<Object[]> data() { + List<Object[]> params = new ArrayList<>(); + for (String protection : Arrays.asList("authentication", "integrity", "privacy")) { + for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) { + params.add(new Object[] { protection, encryptionAlgorithm }); + } + } + return params; + } + + private static void setHdfsSecuredConfiguration(Configuration conf) throws Exception { + // change XXX_USER_NAME_KEY to XXX_KERBEROS_PRINCIPAL_KEY after we drop support for hadoop-2.4.1 + conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm()); + conf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY, PRINCIPAL + "@" + KDC.getRealm()); + conf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, KEYTAB_FILE.getAbsolutePath()); + conf.set(DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, + HTTP_PRINCIPAL + "@" + KDC.getRealm()); + conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); + conf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name()); + conf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0"); + conf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0"); + + File keystoresDir = new File(TEST_UTIL.getDataTestDir("keystore").toUri().getPath()); + keystoresDir.mkdirs(); + String sslConfDir = KeyStoreTestUtil.getClasspathDir(TestGenerateDelegationToken.class); + KeyStoreTestUtil.setupSSLConfig(keystoresDir.getAbsolutePath(), sslConfDir, conf, false); + + conf.setBoolean("ignore.secure.ports.for.testing", true); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Logger.getLogger("org.apache.hadoop.hdfs.StateChange").setLevel(Level.DEBUG); + Logger.getLogger("BlockStateChange").setLevel(Level.DEBUG); + EVENT_LOOP_GROUP = new NioEventLoopGroup(); + TEST_UTIL.getConfiguration().setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT_MS); + Properties conf = MiniKdc.createConf(); + conf.put(MiniKdc.DEBUG, true); + KDC = new MiniKdc(conf, new File(TEST_UTIL.getDataTestDir("kdc").toUri().getPath())); + KDC.start(); + USERNAME = UserGroupInformation.getLoginUser().getShortUserName(); + PRINCIPAL = USERNAME + "/" + HOST; + HTTP_PRINCIPAL = "HTTP/" + HOST; + KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL); + setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration()); + HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath()); + HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + KDC.getRealm()); + HBaseKerberosUtils.setSecuredConfiguration(TEST_UTIL.getConfiguration()); + UserGroupInformation.setConfiguration(TEST_UTIL.getConfiguration()); + } + + @AfterClass + public static void tearDownAfterClass() throws IOException, InterruptedException { + if (EVENT_LOOP_GROUP != null) { + EVENT_LOOP_GROUP.shutdownGracefully().sync(); + } + if (KDC != null) { + KDC.stop(); + } + } + + @Before + public void setUp() throws Exception { + TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", protection); + if (StringUtils.isBlank(encryptionAlgorithm)) { + TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, false); + TEST_UTIL.getConfiguration().unset(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + } else { + TEST_UTIL.getConfiguration().setBoolean(DFS_ENCRYPT_DATA_TRANSFER_KEY, true); + TEST_UTIL.getConfiguration().set(DFS_DATA_ENCRYPTION_ALGORITHM_KEY, encryptionAlgorithm); + } + TEST_UTIL.startMiniDFSCluster(3); + FS = TEST_UTIL.getDFSCluster().getFileSystem(); + } + + @After + public void tearDown() throws IOException { + TEST_UTIL.shutdownMiniDFSCluster(); + } + + private Path getTestFile() { + return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_")); + } + + @Test + public void test() throws IOException, InterruptedException, ExecutionException { + Path f = getTestFile(); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 1, FS.getDefaultBlockSize(), eventLoop); + TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(eventLoop, FS, f, out); + } +}