Repository: hbase
Updated Branches:
  refs/heads/master eb67ee0d0 -> 856ee283f


HBASE-19371 Running WALPerformanceEvaluation against asyncfswal throws 
exceptions

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/856ee283
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/856ee283
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/856ee283

Branch: refs/heads/master
Commit: 856ee283faf003404e8925006ce0e591c4eba600
Parents: eb67ee0
Author: zhangduo <zhang...@apache.org>
Authored: Sat Dec 9 14:21:52 2017 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Dec 11 13:38:02 2017 -0800

----------------------------------------------------------------------
 .../FanOutOneBlockAsyncDFSOutputHelper.java     | 90 ++++++++++----------
 1 file changed, 47 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/856ee283/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 08e1aae..a7c26e0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -192,7 +192,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   // helper class for creating data checksum.
   private interface ChecksumCreater {
-    DataChecksum createChecksum(Object conf);
+    DataChecksum createChecksum(DFSClient client);
   }
 
   private static final ChecksumCreater CHECKSUM_CREATER;
@@ -200,12 +200,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   // helper class for creating files.
   private interface FileCreator {
     default HdfsFileStatus create(ClientProtocol instance, String src, 
FsPermission masked,
-        String clientName, EnumSetWritable<CreateFlag> flag,
-        boolean createParent, short replication, long blockSize,
-        CryptoProtocolVersion[] supportedVersions) throws Exception {
+        String clientName, EnumSetWritable<CreateFlag> flag, boolean 
createParent,
+        short replication, long blockSize, CryptoProtocolVersion[] 
supportedVersions)
+        throws Exception {
       try {
         return (HdfsFileStatus) createObject(instance, src, masked, 
clientName, flag, createParent,
-            replication, blockSize, supportedVersions);
+          replication, blockSize, supportedVersions);
       } catch (InvocationTargetException e) {
         if (e.getCause() instanceof Exception) {
           throw (Exception) e.getCause();
@@ -215,9 +215,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       }
     };
 
-    Object createObject(ClientProtocol instance, String src, FsPermission 
masked,
-        String clientName, EnumSetWritable<CreateFlag> flag,
-        boolean createParent, short replication, long blockSize,
+    Object createObject(ClientProtocol instance, String src, FsPermission 
masked, String clientName,
+        EnumSetWritable<CreateFlag> flag, boolean createParent, short 
replication, long blockSize,
         CryptoProtocolVersion[] supportedVersions) throws Exception;
   }
 
@@ -276,9 +275,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       ecnClass = 
Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
           .asSubclass(Enum.class);
     } catch (ClassNotFoundException e) {
-      String msg = "Couldn't properly initialize the PipelineAck.ECN class. 
Please "
-          + "update your WAL Provider to not make use of the 'asyncfs' 
provider. See "
-          + "HBASE-16110 for more information.";
+      String msg = "Couldn't properly initialize the PipelineAck.ECN class. 
Please " +
+          "update your WAL Provider to not make use of the 'asyncfs' provider. 
See " +
+          "HBASE-16110 for more information.";
       LOG.error(msg, e);
       throw new Error(msg, e);
     }
@@ -332,7 +331,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     try {
       return createPipelineAckStatusGetter27();
     } catch (NoSuchMethodException e) {
-      LOG.debug("Can not get expected methods, should be hadoop 2.6-", e);
+      LOG.debug("Can not get expected method " + e.getMessage() +
+          ", this usually because your Hadoop is pre 2.7.0, " +
+          "try the methods in Hadoop 2.6.x instead.");
     }
     return createPipelineAckStatusGetter26();
   }
@@ -414,7 +415,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       helperClass = Class.forName(clazzName);
     } catch (ClassNotFoundException e) {
       helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
-      LOG.debug(""  + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+      LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
           helperClass.toString() + " instead.");
     }
     Method convertEBMethod = helperClass.getMethod("convert", 
ExtendedBlock.class);
@@ -441,7 +442,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     };
   }
 
-  private static ChecksumCreater createChecksumCreater28(Class<?> confClass)
+  private static ChecksumCreater createChecksumCreater28(Method getConfMethod, 
Class<?> confClass)
       throws NoSuchMethodException {
     for (Method method : confClass.getMethods()) {
       if (method.getName().equals("createChecksum")) {
@@ -449,9 +450,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         return new ChecksumCreater() {
 
           @Override
-          public DataChecksum createChecksum(Object conf) {
+          public DataChecksum createChecksum(DFSClient client) {
             try {
-              return (DataChecksum) createChecksumMethod.invoke(conf, (Object) 
null);
+              return (DataChecksum) 
createChecksumMethod.invoke(getConfMethod.invoke(client),
+                (Object) null);
             } catch (IllegalAccessException | InvocationTargetException e) {
               throw new RuntimeException(e);
             }
@@ -462,16 +464,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     throw new NoSuchMethodException("Can not find createChecksum method in 
DfsClientConf");
   }
 
-  private static ChecksumCreater createChecksumCreater27(Class<?> confClass)
+  private static ChecksumCreater createChecksumCreater27(Method getConfMethod, 
Class<?> confClass)
       throws NoSuchMethodException {
     Method createChecksumMethod = 
confClass.getDeclaredMethod("createChecksum");
     createChecksumMethod.setAccessible(true);
     return new ChecksumCreater() {
 
       @Override
-      public DataChecksum createChecksum(Object conf) {
+      public DataChecksum createChecksum(DFSClient client) {
         try {
-          return (DataChecksum) createChecksumMethod.invoke(conf);
+          return (DataChecksum) 
createChecksumMethod.invoke(getConfMethod.invoke(client));
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
@@ -481,36 +483,38 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private static ChecksumCreater createChecksumCreater()
       throws NoSuchMethodException, ClassNotFoundException {
+    Method getConfMethod = DFSClient.class.getMethod("getConf");
     try {
-      return createChecksumCreater28(
+      return createChecksumCreater28(getConfMethod,
         Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
     } catch (ClassNotFoundException e) {
       LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
     }
-    return 
createChecksumCreater27(Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+    return createChecksumCreater27(getConfMethod,
+      Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
   }
 
   private static FileCreator createFileCreator3() throws NoSuchMethodException 
{
     Method createMethod = ClientProtocol.class.getMethod("create", 
String.class, FsPermission.class,
-      String.class, EnumSetWritable.class, boolean.class, short.class, 
long.class, CryptoProtocolVersion[].class,
-      String.class);
+      String.class, EnumSetWritable.class, boolean.class, short.class, 
long.class,
+      CryptoProtocolVersion[].class, String.class);
 
     return (instance, src, masked, clientName, flag, createParent, 
replication, blockSize,
-            supportedVersions) -> {
-      return (HdfsFileStatus) createMethod.invoke(instance,
-          src, masked, clientName, flag, createParent, replication, blockSize, 
supportedVersions,
-          null);
+        supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, 
clientName, flag,
+        createParent, replication, blockSize, supportedVersions, null);
     };
   }
 
   private static FileCreator createFileCreator2() throws NoSuchMethodException 
{
     Method createMethod = ClientProtocol.class.getMethod("create", 
String.class, FsPermission.class,
-      String.class, EnumSetWritable.class, boolean.class, short.class, 
long.class, CryptoProtocolVersion[].class);
+      String.class, EnumSetWritable.class, boolean.class, short.class, 
long.class,
+      CryptoProtocolVersion[].class);
 
     return (instance, src, masked, clientName, flag, createParent, 
replication, blockSize,
-            supportedVersions) -> {
-      return (HdfsFileStatus) createMethod.invoke(instance,
-          src, masked, clientName, flag, createParent, replication, blockSize, 
supportedVersions);
+        supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, 
clientName, flag,
+        createParent, replication, blockSize, supportedVersions);
     };
   }
 
@@ -549,9 +553,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       CHECKSUM_CREATER = createChecksumCreater();
       FILE_CREATOR = createFileCreator();
     } catch (Exception e) {
-      String msg = "Couldn't properly initialize access to HDFS internals. 
Please "
-          + "update your WAL Provider to not make use of the 'asyncfs' 
provider. See "
-          + "HBASE-16110 for more information.";
+      String msg = "Couldn't properly initialize access to HDFS internals. 
Please " +
+          "update your WAL Provider to not make use of the 'asyncfs' provider. 
See " +
+          "HBASE-16110 for more information.";
       LOG.error(msg, e);
       throw new Error(msg, e);
     }
@@ -566,7 +570,7 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
   }
 
   static DataChecksum createChecksum(DFSClient client) {
-    return CHECKSUM_CREATER.createChecksum(client.getConf());
+    return CHECKSUM_CREATER.createChecksum(client);
   }
 
   static Status getStatus(PipelineAckProto ack) {
@@ -590,11 +594,11 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
           String logInfo = "ack with firstBadLink as " + 
resp.getFirstBadLink();
           if (resp.getStatus() != Status.SUCCESS) {
             if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-              throw new InvalidBlockTokenException("Got access token error" + 
", status message "
-                  + resp.getMessage() + ", " + logInfo);
+              throw new InvalidBlockTokenException("Got access token error" + 
", status message " +
+                  resp.getMessage() + ", " + logInfo);
             } else {
-              throw new IOException("Got error" + ", status=" + 
resp.getStatus().name()
-                  + ", status message " + resp.getMessage() + ", " + logInfo);
+              throw new IOException("Got error" + ", status=" + 
resp.getStatus().name() +
+                  ", status message " + resp.getMessage() + ", " + logInfo);
             }
           }
           // success
@@ -667,10 +671,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     });
   }
 
-  private static List<Future<Channel>> connectToDataNodes(Configuration conf,
-      DFSClient client, String clientName, LocatedBlock locatedBlock, long 
maxBytesRcvd,
-      long latestGS, BlockConstructionStage stage, DataChecksum summer,
-      EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) {
+  private static List<Future<Channel>> connectToDataNodes(Configuration conf, 
DFSClient client,
+      String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long 
latestGS,
+      BlockConstructionStage stage, DataChecksum summer, EventLoopGroup 
eventLoopGroup,
+      Class<? extends Channel> channelClass) {
     Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
     DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
     boolean connectToDnViaHostname =

Reply via email to