Repository: hbase
Updated Branches:
  refs/heads/master b4ed13008 -> c4ced0b3d


HBASE-18975 Fix backup / restore hadoop3 incompatibility (Vladimir Rodionov)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c4ced0b3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c4ced0b3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c4ced0b3

Branch: refs/heads/master
Commit: c4ced0b3d50002b73c7dc3121b08d97afbe8e97b
Parents: b4ed130
Author: tedyu <yuzhih...@gmail.com>
Authored: Wed Oct 11 12:26:34 2017 -0700
Committer: tedyu <yuzhih...@gmail.com>
Committed: Wed Oct 11 12:26:34 2017 -0700

----------------------------------------------------------------------
 .../mapreduce/MapReduceBackupCopyJob.java       | 85 +++++++++++---------
 1 file changed, 49 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c4ced0b3/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
----------------------------------------------------------------------
diff --git 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
index 29e71e7..07e9fcc 100644
--- 
a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
+++ 
b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceBackupCopyJob.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.backup.mapreduce;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.util.Arrays;
@@ -35,7 +36,6 @@ import org.apache.hadoop.hbase.backup.BackupInfo;
 import org.apache.hadoop.hbase.backup.BackupType;
 import org.apache.hadoop.hbase.backup.impl.BackupManager;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Counters;
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.tools.DistCp;
 import org.apache.hadoop.tools.DistCpConstants;
 import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.zookeeper.KeeperException.NoNodeException;
 
 /**
@@ -154,30 +155,19 @@ public class MapReduceBackupCopyJob implements 
BackupCopyJob {
       this.backupManager = backupManager;
     }
 
+
     @Override
     public Job execute() throws Exception {
 
       // reflection preparation for private methods and fields
       Class<?> classDistCp = org.apache.hadoop.tools.DistCp.class;
-      Method methodCreateMetaFolderPath = 
classDistCp.getDeclaredMethod("createMetaFolderPath");
-      Method methodCreateJob = classDistCp.getDeclaredMethod("createJob");
-      Method methodCreateInputFileListing =
-          classDistCp.getDeclaredMethod("createInputFileListing", Job.class);
       Method methodCleanup = classDistCp.getDeclaredMethod("cleanup");
 
-      Field fieldInputOptions = classDistCp.getDeclaredField("inputOptions");
-      Field fieldMetaFolder = classDistCp.getDeclaredField("metaFolder");
-      Field fieldJobFS = classDistCp.getDeclaredField("jobFS");
+      Field fieldInputOptions = getInputOptionsField(classDistCp);
       Field fieldSubmitted = classDistCp.getDeclaredField("submitted");
 
-      methodCreateMetaFolderPath.setAccessible(true);
-      methodCreateJob.setAccessible(true);
-      methodCreateInputFileListing.setAccessible(true);
       methodCleanup.setAccessible(true);
-
       fieldInputOptions.setAccessible(true);
-      fieldMetaFolder.setAccessible(true);
-      fieldJobFS.setAccessible(true);
       fieldSubmitted.setAccessible(true);
 
       // execute() logic starts here
@@ -185,16 +175,8 @@ public class MapReduceBackupCopyJob implements 
BackupCopyJob {
 
       Job job = null;
       try {
-        synchronized (this) {
-          // Don't cleanup while we are setting up.
-          fieldMetaFolder.set(this, methodCreateMetaFolderPath.invoke(this));
-          fieldJobFS.set(this, ((Path) 
fieldMetaFolder.get(this)).getFileSystem(super.getConf()));
-          job = (Job) methodCreateJob.invoke(this);
-        }
-        methodCreateInputFileListing.invoke(this, job);
 
-        // Get the total length of the source files
-        List<Path> srcs = ((DistCpOptions) 
fieldInputOptions.get(this)).getSourcePaths();
+        List<Path> srcs = getSourcePaths(fieldInputOptions);
 
         long totalSrcLgth = 0;
         for (Path aSrc : srcs) {
@@ -202,14 +184,9 @@ public class MapReduceBackupCopyJob implements 
BackupCopyJob {
               BackupUtils.getFilesLength(aSrc.getFileSystem(super.getConf()), 
aSrc);
         }
 
-        // submit the copy job
-        job.submit();
-        fieldSubmitted.set(this, true);
-
-        // after submit the MR job, set its handler in backup handler for 
cancel process
-        // this.backupHandler.copyJob = job;
-
-        // Update the copy progress to ZK every 0.5s if progress value changed
+        // Async call
+        job = super.execute();
+        // Update the copy progress to system table every 0.5s if progress 
value changed
         int progressReportFreq =
             
MapReduceBackupCopyJob.this.getConf().getInt("hbase.backup.progressreport.frequency",
               500);
@@ -251,10 +228,6 @@ public class MapReduceBackupCopyJob implements 
BackupCopyJob {
       } catch (Throwable t) {
         LOG.error("distcp " + job == null ? "" : job.getJobID() + " 
encountered error", t);
         throw t;
-      } finally {
-        if (!fieldSubmitted.getBoolean(this)) {
-          methodCleanup.invoke(this);
-        }
       }
 
       String jobID = job.getJobID().toString();
@@ -271,6 +244,43 @@ public class MapReduceBackupCopyJob implements 
BackupCopyJob {
       return job;
     }
 
+    private Field getInputOptionsField(Class<?> classDistCp) throws 
IOException{
+      Field f = null;
+      try {
+        f = classDistCp.getDeclaredField("inputOptions");
+      } catch(Exception e) {
+        // Haddop 3
+        try {
+          f = classDistCp.getDeclaredField("context");
+        } catch (NoSuchFieldException | SecurityException e1) {
+          throw new IOException(e1);
+        }
+      }
+      return f;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<Path> getSourcePaths(Field fieldInputOptions) throws 
IOException{
+      Object options;
+      try {
+        options = fieldInputOptions.get(this);
+        if (options instanceof DistCpOptions) {
+          return ((DistCpOptions) options).getSourcePaths();
+        } else {
+          // Hadoop 3
+          Class<?> classContext = 
Class.forName("org.apache.hadoop.tools.DistCpContext");
+          Method methodGetSourcePaths = 
classContext.getDeclaredMethod("getSourcePaths");
+          methodGetSourcePaths.setAccessible(true);
+
+          return (List<Path>) methodGetSourcePaths.invoke(options);
+        }
+      } catch (IllegalArgumentException | IllegalAccessException |
+                ClassNotFoundException | NoSuchMethodException |
+                SecurityException | InvocationTargetException e) {
+        throw new IOException(e);
+      }
+
+    }
   }
 
   /**
@@ -306,11 +316,14 @@ public class MapReduceBackupCopyJob implements 
BackupCopyJob {
         // We need to create the target dir before run distcp.
         LOG.debug("DistCp options: " + Arrays.toString(options));
         Path dest = new Path(options[options.length - 1]);
+        String[] newOptions = new String[options.length + 1];
+        System.arraycopy(options, 0, newOptions, 1, options.length);
+        newOptions[0] = "-async"; // run DisCp in async mode
         FileSystem destfs = dest.getFileSystem(conf);
         if (!destfs.exists(dest)) {
           destfs.mkdirs(dest);
         }
-        res = distcp.run(options);
+        res = distcp.run(newOptions);
       }
       return res;
 

Reply via email to