HBASE-15743 Add Transparent Data Encryption support for 
FanOutOneBlockAsyncDFSOutput


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0d252918
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0d252918
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0d252918

Branch: refs/heads/hbase-12439
Commit: 0d252918f65aca8dba070f2da1d32de8b2fd1c1c
Parents: 532b914
Author: zhangduo <zhang...@apache.org>
Authored: Thu May 5 10:06:55 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri May 6 08:55:58 2016 +0800

----------------------------------------------------------------------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  24 +-
 .../FanOutOneBlockAsyncDFSOutputHelper.java     |   9 +-
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 239 +++++++++++++------
 .../TestSaslFanOutOneBlockAsyncDFSOutput.java   |  75 +++++-
 4 files changed, 264 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0d252918/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 7d6a676..8dd7f5e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -43,6 +43,7 @@ import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.CompletionHandler;
 import java.util.ArrayDeque;
 import java.util.Collection;
@@ -58,6 +59,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
+import 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -119,6 +121,8 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
 
   private final LocatedBlock locatedBlock;
 
+  private final CryptoCodec cryptoCodec;
+
   private final EventLoop eventLoop;
 
   private final List<Channel> datanodeList;
@@ -317,8 +321,8 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
 
   FanOutOneBlockAsyncDFSOutput(Configuration conf, FSUtils fsUtils, 
DistributedFileSystem dfs,
       DFSClient client, ClientProtocol namenode, String clientName, String 
src, long fileId,
-      LocatedBlock locatedBlock, EventLoop eventLoop, List<Channel> 
datanodeList,
-      DataChecksum summer, ByteBufAllocator alloc) {
+      LocatedBlock locatedBlock, CryptoCodec cryptoCodec, EventLoop eventLoop,
+      List<Channel> datanodeList, DataChecksum summer, ByteBufAllocator alloc) 
{
     this.conf = conf;
     this.fsUtils = fsUtils;
     this.dfs = dfs;
@@ -328,6 +332,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     this.clientName = clientName;
     this.src = src;
     this.locatedBlock = locatedBlock;
+    this.cryptoCodec = cryptoCodec;
     this.eventLoop = eventLoop;
     this.datanodeList = datanodeList;
     this.summer = summer;
@@ -342,16 +347,27 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     write(b, 0, b.length);
   }
 
+  private void write0(byte[] b, final int off, final int len) {
+    buf.ensureWritable(len);
+    if (cryptoCodec == null) {
+      buf.writeBytes(b, off, len);
+    } else {
+      ByteBuffer inBuffer = ByteBuffer.wrap(b, off, len);
+      cryptoCodec.encrypt(inBuffer, buf.nioBuffer(buf.writerIndex(), len));
+      buf.writerIndex(buf.writerIndex() + len);
+    }
+  }
+
   @Override
   public void write(final byte[] b, final int off, final int len) {
     if (eventLoop.inEventLoop()) {
-      buf.ensureWritable(len).writeBytes(b, off, len);
+      write0(b, off, len);
     } else {
       eventLoop.submit(new Runnable() {
 
         @Override
         public void run() {
-          buf.ensureWritable(len).writeBytes(b, off, len);
+          write0(b, off, len);
         }
       }).syncUninterruptibly();
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d252918/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 4f9058c..7b680e1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -21,6 +21,7 @@ import static 
io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
 import static io.netty.handler.timeout.IdleState.READER_IDLE;
 import static org.apache.hadoop.fs.CreateFlag.CREATE;
 import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createCryptoCodec;
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
@@ -73,6 +74,7 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
+import 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.CryptoCodec;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -672,9 +674,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         // layer should retry itself if needed.
         datanodeList.add(future.syncUninterruptibly().getNow());
       }
+      CryptoCodec cryptocodec = createCryptoCodec(conf, stat, client);
+      FanOutOneBlockAsyncDFSOutput output = new 
FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs,
+          client, namenode, clientName, src, stat.getFileId(), locatedBlock, 
cryptocodec, eventLoop,
+          datanodeList, summer, ALLOC);
       succ = true;
-      return new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, 
namenode, clientName,
-          src, stat.getFileId(), locatedBlock, eventLoop, datanodeList, 
summer, ALLOC);
+      return output;
     } finally {
       if (!succ) {
         if (futureList != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d252918/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
index 22c4e04..33e8841 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -21,6 +21,7 @@ import static io.netty.handler.timeout.IdleState.READER_IDLE;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -79,6 +80,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import 
org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
@@ -111,7 +113,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   @VisibleForTesting
   static final String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
-      "dfs.encrypt.data.transfer.cipher.suites";
+    "dfs.encrypt.data.transfer.cipher.suites";
 
   @VisibleForTesting
   static final String AES_CTR_NOPADDING = "AES/CTR/NoPadding";
@@ -129,7 +131,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   private static final SaslAdaptor SASL_ADAPTOR;
 
-  private interface CipherHelper {
+  private interface CipherOptionHelper {
 
     List<Object> getCipherOptions(Configuration conf) throws IOException;
 
@@ -150,9 +152,19 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     byte[] getOutIv(Object cipherOption);
   }
 
-  private static final CipherHelper CIPHER_HELPER;
+  private static final CipherOptionHelper CIPHER_OPTION_HELPER;
 
-  private static final class CryptoCodec {
+  private interface TransparentCryptoHelper {
+
+    Object getFileEncryptionInfo(HdfsFileStatus stat);
+
+    CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, DFSClient 
client)
+        throws IOException;
+  }
+
+  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+  static final class CryptoCodec {
 
     private static final Method CREATE_CODEC;
 
@@ -215,23 +227,34 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
     private final Object decryptor;
 
     public CryptoCodec(Configuration conf, Object cipherOption) {
-      Object codec;
       try {
-        codec = CREATE_CODEC.invoke(null, conf, 
CIPHER_HELPER.getCipherSuite(cipherOption));
+        Object codec = CREATE_CODEC.invoke(null, conf,
+          CIPHER_OPTION_HELPER.getCipherSuite(cipherOption));
         encryptor = CREATE_ENCRYPTOR.invoke(codec);
-        byte[] encKey = CIPHER_HELPER.getInKey(cipherOption);
-        byte[] encIv = CIPHER_HELPER.getInIv(cipherOption);
+        byte[] encKey = CIPHER_OPTION_HELPER.getInKey(cipherOption);
+        byte[] encIv = CIPHER_OPTION_HELPER.getInIv(cipherOption);
         INIT_ENCRYPTOR.invoke(encryptor, encKey, Arrays.copyOf(encIv, 
encIv.length));
 
         decryptor = CREATE_DECRYPTOR.invoke(codec);
-        byte[] decKey = CIPHER_HELPER.getOutKey(cipherOption);
-        byte[] decIv = CIPHER_HELPER.getOutIv(cipherOption);
+        byte[] decKey = CIPHER_OPTION_HELPER.getOutKey(cipherOption);
+        byte[] decIv = CIPHER_OPTION_HELPER.getOutIv(cipherOption);
         INIT_DECRYPTOR.invoke(decryptor, decKey, Arrays.copyOf(decIv, 
decIv.length));
       } catch (IllegalAccessException | InvocationTargetException e) {
         throw new RuntimeException(e);
       }
     }
 
+    public CryptoCodec(Configuration conf, Object cipherSuite, byte[] encKey, 
byte[] encIv) {
+      try {
+        Object codec = CREATE_CODEC.invoke(null, conf, cipherSuite);
+        encryptor = CREATE_ENCRYPTOR.invoke(codec);
+        INIT_ENCRYPTOR.invoke(encryptor, encKey, encIv);
+        decryptor = null;
+      } catch (IllegalAccessException | InvocationTargetException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
     public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer) {
       try {
         ENCRYPT.invoke(encryptor, inBuffer, outBuffer);
@@ -251,17 +274,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
 
   private static SaslAdaptor createSaslAdaptor27(Class<?> 
saslDataTransferClientClass)
       throws NoSuchFieldException, NoSuchMethodException {
-    final Field saslPropsResolverField =
-        saslDataTransferClientClass.getDeclaredField("saslPropsResolver");
+    final Field saslPropsResolverField = saslDataTransferClientClass
+        .getDeclaredField("saslPropsResolver");
     saslPropsResolverField.setAccessible(true);
-    final Field trustedChannelResolverField =
-        saslDataTransferClientClass.getDeclaredField("trustedChannelResolver");
+    final Field trustedChannelResolverField = saslDataTransferClientClass
+        .getDeclaredField("trustedChannelResolver");
     trustedChannelResolverField.setAccessible(true);
-    final Field fallbackToSimpleAuthField =
-        saslDataTransferClientClass.getDeclaredField("fallbackToSimpleAuth");
+    final Field fallbackToSimpleAuthField = saslDataTransferClientClass
+        .getDeclaredField("fallbackToSimpleAuth");
     fallbackToSimpleAuthField.setAccessible(true);
-    final Method getSaslDataTransferClientMethod =
-        DFSClient.class.getMethod("getSaslDataTransferClient");
+    final Method getSaslDataTransferClientMethod = DFSClient.class
+        .getMethod("getSaslDataTransferClient");
     final Method newDataEncryptionKeyMethod = 
DFSClient.class.getMethod("newDataEncryptionKey");
     return new SaslAdaptor() {
 
@@ -288,8 +311,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
       @Override
       public AtomicBoolean getFallbackToSimpleAuth(DFSClient client) {
         try {
-          return (AtomicBoolean) 
fallbackToSimpleAuthField.get(getSaslDataTransferClientMethod
-              .invoke(client));
+          return (AtomicBoolean) fallbackToSimpleAuthField
+              .get(getSaslDataTransferClientMethod.invoke(client));
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
@@ -308,8 +331,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
 
   private static SaslAdaptor createSaslAdaptor25() {
     try {
-      final Field trustedChannelResolverField =
-          DFSClient.class.getDeclaredField("trustedChannelResolver");
+      final Field trustedChannelResolverField = DFSClient.class
+          .getDeclaredField("trustedChannelResolver");
       trustedChannelResolverField.setAccessible(true);
       final Method getDataEncryptionKeyMethod = 
DFSClient.class.getMethod("getDataEncryptionKey");
       return new SaslAdaptor() {
@@ -351,8 +374,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
   private static SaslAdaptor createSaslAdaptor() {
     Class<?> saslDataTransferClientClass = null;
     try {
-      saslDataTransferClientClass =
-          
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
+      saslDataTransferClientClass = Class
+          
.forName("org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient");
     } catch (ClassNotFoundException e) {
       LOG.warn("No SaslDataTransferClient class found, should be hadoop 2.5-");
     }
@@ -364,8 +387,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
   }
 
-  private static CipherHelper createCipherHelper25() {
-    return new CipherHelper() {
+  private static CipherOptionHelper createCipherHelper25() {
+    return new CipherOptionHelper() {
 
       @Override
       public byte[] getOutKey(Object cipherOption) {
@@ -410,18 +433,17 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
     };
   }
 
-  private static CipherHelper createCipherHelper27(Class<?> cipherOptionClass)
+  private static CipherOptionHelper createCipherHelper27(Class<?> 
cipherOptionClass)
       throws ClassNotFoundException, NoSuchMethodException {
     @SuppressWarnings("rawtypes")
-    Class<? extends Enum> cipherSuiteClass =
-        
Class.forName("org.apache.hadoop.crypto.CipherSuite").asSubclass(Enum.class);
+    Class<? extends Enum> cipherSuiteClass = 
Class.forName("org.apache.hadoop.crypto.CipherSuite")
+        .asSubclass(Enum.class);
     @SuppressWarnings("unchecked")
     final Enum<?> aesCipherSuite = Enum.valueOf(cipherSuiteClass, 
"AES_CTR_NOPADDING");
-    final Constructor<?> cipherOptionConstructor =
-        cipherOptionClass.getConstructor(cipherSuiteClass);
-    final Constructor<?> cipherOptionWithKeyAndIvConstructor =
-        cipherOptionClass.getConstructor(cipherSuiteClass, byte[].class, 
byte[].class,
-          byte[].class, byte[].class);
+    final Constructor<?> cipherOptionConstructor = cipherOptionClass
+        .getConstructor(cipherSuiteClass);
+    final Constructor<?> cipherOptionWithKeyAndIvConstructor = 
cipherOptionClass
+        .getConstructor(cipherSuiteClass, byte[].class, byte[].class, 
byte[].class, byte[].class);
 
     final Method getCipherSuiteMethod = 
cipherOptionClass.getMethod("getCipherSuite");
     final Method getInKeyMethod = cipherOptionClass.getMethod("getInKey");
@@ -429,16 +451,15 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
     final Method getOutKeyMethod = cipherOptionClass.getMethod("getOutKey");
     final Method getOutIvMethod = cipherOptionClass.getMethod("getOutIv");
 
-    final Method convertCipherOptionsMethod =
-        PBHelper.class.getMethod("convertCipherOptions", List.class);
-    final Method convertCipherOptionProtosMethod =
-        PBHelper.class.getMethod("convertCipherOptionProtos", List.class);
-    final Method addAllCipherOptionMethod =
-        
DataTransferEncryptorMessageProto.Builder.class.getMethod("addAllCipherOption",
-          Iterable.class);
-    final Method getCipherOptionListMethod =
-        
DataTransferEncryptorMessageProto.class.getMethod("getCipherOptionList");
-    return new CipherHelper() {
+    final Method convertCipherOptionsMethod = 
PBHelper.class.getMethod("convertCipherOptions",
+      List.class);
+    final Method convertCipherOptionProtosMethod = PBHelper.class
+        .getMethod("convertCipherOptionProtos", List.class);
+    final Method addAllCipherOptionMethod = 
DataTransferEncryptorMessageProto.Builder.class
+        .getMethod("addAllCipherOption", Iterable.class);
+    final Method getCipherOptionListMethod = 
DataTransferEncryptorMessageProto.class
+        .getMethod("getCipherOptionList");
+    return new CipherOptionHelper() {
 
       @Override
       public byte[] getOutKey(Object cipherOption) {
@@ -532,9 +553,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
           boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws 
IOException {
         List<Object> cipherOptions;
         try {
-          cipherOptions =
-              (List<Object>) convertCipherOptionProtosMethod.invoke(null,
-                getCipherOptionListMethod.invoke(proto));
+          cipherOptions = (List<Object>) 
convertCipherOptionProtosMethod.invoke(null,
+            getCipherOptionListMethod.invoke(proto));
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
@@ -557,7 +577,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     };
   }
 
-  private static CipherHelper createCipherHelper() {
+  private static CipherOptionHelper createCipherHelper() {
     Class<?> cipherOptionClass;
     try {
       cipherOptionClass = 
Class.forName("org.apache.hadoop.crypto.CipherOption");
@@ -572,9 +592,79 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
   }
 
+  private static TransparentCryptoHelper createTransparentCryptoHelper25() {
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Object getFileEncryptionInfo(HdfsFileStatus stat) {
+        return null;
+      }
+
+      @Override
+      public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, 
DFSClient client) {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper 
createTransparentCryptoHelper27(Class<?> feInfoClass)
+      throws NoSuchMethodException, ClassNotFoundException {
+    final Method getFileEncryptionInfoMethod = HdfsFileStatus.class
+        .getMethod("getFileEncryptionInfo");
+    final Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+        .getDeclaredMethod("decryptEncryptedDataEncryptionKey", feInfoClass);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    final Method getCipherSuiteMethod = 
feInfoClass.getMethod("getCipherSuite");
+    Class<?> keyVersionClass = 
Class.forName("org.apache.hadoop.crypto.key.KeyProvider$KeyVersion");
+    final Method getMaterialMethod = keyVersionClass.getMethod("getMaterial");
+    final Method getIVMethod = feInfoClass.getMethod("getIV");
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Object getFileEncryptionInfo(HdfsFileStatus stat) {
+        try {
+          return getFileEncryptionInfoMethod.invoke(stat);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public CryptoCodec createCryptoCodec(Configuration conf, Object feInfo, 
DFSClient client)
+          throws IOException {
+        try {
+          Object decrypted = 
decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+          return new CryptoCodec(conf, getCipherSuiteMethod.invoke(feInfo),
+              (byte[]) getMaterialMethod.invoke(decrypted), (byte[]) 
getIVMethod.invoke(feInfo));
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), 
IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelper() {
+    Class<?> feInfoClass;
+    try {
+      feInfoClass = Class.forName("org.apache.hadoop.fs.FileEncryptionInfo");
+    } catch (ClassNotFoundException e) {
+      LOG.warn("No FileEncryptionInfo class found, should be hadoop 2.5-");
+      return createTransparentCryptoHelper25();
+    }
+    try {
+      return createTransparentCryptoHelper27(feInfoClass);
+    } catch (NoSuchMethodException | ClassNotFoundException e) {
+      throw new Error(e);
+    }
+  }
+
   static {
     SASL_ADAPTOR = createSaslAdaptor();
-    CIPHER_HELPER = createCipherHelper();
+    CIPHER_OPTION_HELPER = createCipherHelper();
+    TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
   }
 
   /**
@@ -643,9 +733,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
         Map<String, String> saslProps, int timeoutMs, Promise<Void> promise) 
throws SaslException {
       this.conf = conf;
       this.saslProps = saslProps;
-      this.saslClient =
-          Sasl.createSaslClient(new String[] { MECHANISM }, username, 
PROTOCOL, SERVER_NAME,
-            saslProps, new SaslClientCallbackHandler(username, password));
+      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, 
username, PROTOCOL,
+        SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, 
password));
       this.timeoutMs = timeoutMs;
       this.promise = promise;
     }
@@ -656,14 +745,14 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
 
     private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload, 
List<Object> options)
         throws IOException {
-      DataTransferEncryptorMessageProto.Builder builder =
-          DataTransferEncryptorMessageProto.newBuilder();
+      DataTransferEncryptorMessageProto.Builder builder = 
DataTransferEncryptorMessageProto
+          .newBuilder();
       builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
       if (payload != null) {
         builder.setPayload(ByteString.copyFrom(payload));
       }
       if (options != null) {
-        CIPHER_HELPER.addCipherOptions(builder, options);
+        CIPHER_OPTION_HELPER.addCipherOptions(builder, options);
       }
       DataTransferEncryptorMessageProto proto = builder.build();
       int size = proto.getSerializedSize();
@@ -704,8 +793,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
 
     private boolean requestedQopContainsPrivacy() {
-      Set<String> requestedQop =
-          
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      Set<String> requestedQop = ImmutableSet
+          .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
       return requestedQop.contains("auth-conf");
     }
 
@@ -713,15 +802,16 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
       if (!saslClient.isComplete()) {
         throw new IOException("Failed to complete SASL handshake");
       }
-      Set<String> requestedQop =
-          
ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      Set<String> requestedQop = ImmutableSet
+          .copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
       String negotiatedQop = getNegotiatedQop();
-      LOG.debug("Verifying QOP, requested QOP = " + requestedQop + ", 
negotiated QOP = "
-          + negotiatedQop);
+      LOG.debug(
+        "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = 
" + negotiatedQop);
       if (!requestedQop.contains(negotiatedQop)) {
         throw new IOException(String.format("SASL handshake completed, but "
             + "channel does not have acceptable quality of protection, "
-            + "requested = %s, negotiated = %s", requestedQop, negotiatedQop));
+            + "requested = %s, negotiated = %s",
+          requestedQop, negotiatedQop));
       }
     }
 
@@ -741,7 +831,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
           case 1: {
             List<Object> cipherOptions = null;
             if (requestedQopContainsPrivacy()) {
-              cipherOptions = CIPHER_HELPER.getCipherOptions(conf);
+              cipherOptions = CIPHER_OPTION_HELPER.getCipherOptions(conf);
             }
             sendSaslMessage(ctx, response, cipherOptions);
             ctx.flush();
@@ -752,7 +842,7 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
             assert response == null;
             checkSaslComplete();
             Object cipherOption =
-                CIPHER_HELPER.getCipherOption(proto, isNegotiatedQopPrivacy(), 
saslClient);
+                CIPHER_OPTION_HELPER.getCipherOption(proto, 
isNegotiatedQopPrivacy(), saslClient);
             ChannelPipeline p = ctx.pipeline();
             while (p.first() != null) {
               p.removeFirst();
@@ -762,8 +852,9 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
               p.addLast(new EncryptHandler(codec), new DecryptHandler(codec));
             } else {
               if (useWrap()) {
-                p.addLast(new SaslWrapHandler(saslClient), new 
LengthFieldBasedFrameDecoder(
-                    Integer.MAX_VALUE, 0, 4), new 
SaslUnwrapHandler(saslClient));
+                p.addLast(new SaslWrapHandler(saslClient),
+                  new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+                  new SaslUnwrapHandler(saslClient));
               }
             }
             promise.trySuccess(null);
@@ -992,8 +1083,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
     if (encryptionKey != null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client doing encrypted handshake for addr = " + addr + 
", datanodeId = "
-            + dnInfo);
+        LOG.debug(
+          "SASL client doing encrypted handshake for addr = " + addr + ", 
datanodeId = " + dnInfo);
       }
       doSaslNegotiation(conf, channel, timeoutMs, 
getUserNameFromEncryptionKey(encryptionKey),
         encryptionKeyToPassword(encryptionKey.encryptionKey),
@@ -1018,8 +1109,8 @@ public final class FanOutOneBlockAsyncDFSOutputSaslHelper 
{
       saslPromise.trySuccess(null);
     } else if (saslPropsResolver != null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("SASL client doing general handshake for addr = " + addr + 
", datanodeId = "
-            + dnInfo);
+        LOG.debug(
+          "SASL client doing general handshake for addr = " + addr + ", 
datanodeId = " + dnInfo);
       }
       doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
         buildClientPassword(accessToken), 
saslPropsResolver.getClientProperties(addr), saslPromise);
@@ -1035,4 +1126,12 @@ public final class 
FanOutOneBlockAsyncDFSOutputSaslHelper {
     }
   }
 
+  static CryptoCodec createCryptoCodec(Configuration conf, HdfsFileStatus 
stat, DFSClient client)
+      throws IOException {
+    Object feInfo = TRANSPARENT_CRYPTO_HELPER.getFileEncryptionInfo(stat);
+    if (feInfo == null) {
+      return null;
+    }
+    return TRANSPARENT_CRYPTO_HELPER.createCryptoCodec(conf, feInfo, client);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/0d252918/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
index 545a39e..4637a01 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java
@@ -29,6 +29,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -42,7 +44,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
 import org.apache.hadoop.hbase.security.token.TestGenerateDelegationToken;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -63,7 +65,7 @@ import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
 @RunWith(Parameterized.class)
-@Category({ MiscTests.class, MediumTests.class })
+@Category({ MiscTests.class, LargeTests.class })
 public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
   private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
@@ -74,8 +76,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
   private static int READ_TIMEOUT_MS = 200000;
 
-  private static final File KEYTAB_FILE = new File(
-      TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
+  private static final File KEYTAB_FILE =
+      new File(TEST_UTIL.getDataTestDir("keytab").toUri().getPath());
 
   private static MiniKdc KDC;
 
@@ -86,6 +88,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   private static String PRINCIPAL;
 
   private static String HTTP_PRINCIPAL;
+
+  private static String TEST_KEY_NAME = "test_key";
+
+  private static boolean TEST_TRANSPARENT_ENCRYPTION = true;
+
   @Rule
   public TestName name = new TestName();
 
@@ -98,13 +105,20 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   @Parameter(2)
   public String cipherSuite;
 
-  @Parameters(name = "{index}: protection={0}, encryption={1}, 
cipherSuite={2}")
+  @Parameter(3)
+  public boolean useTransparentEncryption;
+
+  @Parameters(
+      name = "{index}: protection={0}, encryption={1}, cipherSuite={2}, 
transparent_enc={3}")
   public static Iterable<Object[]> data() {
     List<Object[]> params = new ArrayList<>();
     for (String protection : Arrays.asList("authentication", "integrity", 
"privacy")) {
       for (String encryptionAlgorithm : Arrays.asList("", "3des", "rc4")) {
         for (String cipherSuite : Arrays.asList("", AES_CTR_NOPADDING)) {
-          params.add(new Object[] { protection, encryptionAlgorithm, 
cipherSuite });
+          for (boolean useTransparentEncryption : Arrays.asList(false, true)) {
+            params.add(new Object[] { protection, encryptionAlgorithm, 
cipherSuite,
+                useTransparentEncryption });
+          }
         }
       }
     }
@@ -132,6 +146,35 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
     conf.setBoolean("ignore.secure.ports.for.testing", true);
   }
 
+  private static void setUpKeyProvider(Configuration conf) throws Exception {
+    Class<?> keyProviderFactoryClass;
+    try {
+      keyProviderFactoryClass = 
Class.forName("org.apache.hadoop.crypto.key.KeyProviderFactory");
+    } catch (ClassNotFoundException e) {
+      // should be hadoop 2.5-, give up
+      TEST_TRANSPARENT_ENCRYPTION = false;
+      return;
+    }
+
+    URI keyProviderUri =
+        new URI("jceks://file" + 
TEST_UTIL.getDataTestDir("test.jks").toUri().toString());
+    conf.set("dfs.encryption.key.provider.uri", keyProviderUri.toString());
+    Method getKeyProviderMethod =
+        keyProviderFactoryClass.getMethod("get", URI.class, 
Configuration.class);
+    Object keyProvider = getKeyProviderMethod.invoke(null, keyProviderUri, 
conf);
+    Class<?> keyProviderClass = 
Class.forName("org.apache.hadoop.crypto.key.KeyProvider");
+    Class<?> keyProviderOptionsClass =
+        Class.forName("org.apache.hadoop.crypto.key.KeyProvider$Options");
+    Method createKeyMethod =
+        keyProviderClass.getMethod("createKey", String.class, 
keyProviderOptionsClass);
+    Object options = 
keyProviderOptionsClass.getConstructor(Configuration.class).newInstance(conf);
+    createKeyMethod.invoke(keyProvider, TEST_KEY_NAME, options);
+    Method flushMethod = keyProviderClass.getMethod("flush");
+    flushMethod.invoke(keyProvider);
+    Method closeMethod = keyProviderClass.getMethod("close");
+    closeMethod.invoke(keyProvider);
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     EVENT_LOOP_GROUP = new NioEventLoopGroup();
@@ -144,6 +187,8 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
     PRINCIPAL = USERNAME + "/" + HOST;
     HTTP_PRINCIPAL = "HTTP/" + HOST;
     KDC.createPrincipal(KEYTAB_FILE, PRINCIPAL, HTTP_PRINCIPAL);
+
+    setUpKeyProvider(TEST_UTIL.getConfiguration());
     setHdfsSecuredConfiguration(TEST_UTIL.getConfiguration());
     HBaseKerberosUtils.setKeytabFileForTesting(KEYTAB_FILE.getAbsolutePath());
     HBaseKerberosUtils.setPrincipalForTesting(PRINCIPAL + "@" + 
KDC.getRealm());
@@ -161,6 +206,17 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
     }
   }
 
+  private Path testDirOnTestFs;
+
+  private void createEncryptionZone() throws Exception {
+    if (!TEST_TRANSPARENT_ENCRYPTION) {
+      return;
+    }
+    Method method =
+        DistributedFileSystem.class.getMethod("createEncryptionZone", 
Path.class, String.class);
+    method.invoke(FS, testDirOnTestFs, TEST_KEY_NAME);
+  }
+
   @Before
   public void setUp() throws Exception {
     TEST_UTIL.getConfiguration().set("dfs.data.transfer.protection", 
protection);
@@ -182,6 +238,11 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
 
     TEST_UTIL.startMiniDFSCluster(3);
     FS = TEST_UTIL.getDFSCluster().getFileSystem();
+    testDirOnTestFs = new Path("/" + 
name.getMethodName().replaceAll("[^0-9a-zA-Z]", "_"));
+    FS.mkdirs(testDirOnTestFs);
+    if (useTransparentEncryption) {
+      createEncryptionZone();
+    }
   }
 
   @After
@@ -190,7 +251,7 @@ public class TestSaslFanOutOneBlockAsyncDFSOutput {
   }
 
   private Path getTestFile() {
-    return new Path("/" + name.getMethodName().replaceAll("[^0-9a-zA-Z]", 
"_"));
+    return new Path(testDirOnTestFs, "test");
   }
 
   @Test

Reply via email to