Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.1 7a512f8fb -> 6970b8714


PHOENIX-3812 Use HBase snapshots in async index building M/R job (Akshita 
Malhotra)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6970b871
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6970b871
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6970b871

Branch: refs/heads/4.x-HBase-1.1
Commit: 6970b87145a06065f464fe0dca0733132387635f
Parents: 7a512f8
Author: James Taylor <jamestay...@apache.org>
Authored: Fri Jun 9 16:53:57 2017 -0700
Committer: James Taylor <jamestay...@apache.org>
Committed: Fri Jun 9 16:59:34 2017 -0700

----------------------------------------------------------------------
 .../apache/phoenix/end2end/IndexExtendedIT.java | 23 +++++++---
 .../phoenix/mapreduce/PhoenixInputFormat.java   |  7 +++
 .../phoenix/mapreduce/index/IndexTool.java      | 45 ++++++++++++++++----
 3 files changed, 60 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6970b871/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
index b79e557..53bf625 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java
@@ -74,17 +74,19 @@ public class IndexExtendedIT extends BaseTest {
     private final boolean directApi;
     private final String tableDDLOptions;
     private final boolean mutable;
+    private final boolean useSnapshot;
     
     @AfterClass
     public static void doTeardown() throws Exception {
         tearDownMiniCluster();
     }
 
-    public IndexExtendedIT(boolean transactional, boolean mutable, boolean 
localIndex, boolean directApi) {
+    public IndexExtendedIT(boolean transactional, boolean mutable, boolean 
localIndex, boolean directApi, boolean useSnapshot) {
         this.localIndex = localIndex;
         this.transactional = transactional;
         this.directApi = directApi;
         this.mutable = mutable;
+        this.useSnapshot = useSnapshot;
         StringBuilder optionBuilder = new StringBuilder();
         if (!mutable) {
             optionBuilder.append(" IMMUTABLE_ROWS=true ");
@@ -110,13 +112,16 @@ public class IndexExtendedIT extends BaseTest {
                 .iterator()));
     }
     
-    @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, 
directApi = {3}")
+    @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, 
directApi = {3}, useSnapshot = {4}")
     public static Collection<Boolean[]> data() {
-        return Arrays.asList(new Boolean[][] {     
-                 { false, false, false, false }, { false, false, false, true 
}, { false, false, true, false }, { false, false, true, true }, 
-                 { false, true, false, false }, { false, true, false, true }, 
{ false, true, true, false }, { false, true, true, true }, 
-                 { true, false, false, false }, { true, false, false, true }, 
{ true, false, true, false }, { true, false, true, true }, 
-                 { true, true, false, false }, { true, true, false, true }, { 
true, true, true, false }, { true, true, true, true } 
+        return Arrays.asList(new Boolean[][] {
+                 { false, false, false, false, false }, { false, false, false, 
true, false }, { false, false, true, false, false }, { false, false, true, 
true, false },
+                 { false, true, false, false, false }, { false, true, false, 
true, false }, { false, true, true, false, false }, { false, true, true, true, 
false },
+                 { true, false, false, false, false }, { true, false, false, 
true, false }, { true, false, true, false, false }, { true, false, true, true, 
false },
+                 { true, true, false, false, false }, { true, true, false, 
true, false }, { true, true, true, false, false }, { true, true, true, true, 
false },
+                 { false, true, false, false, true }, { false, true, false, 
true, true }, { false, true, true, false, true }, { false, true, true, true, 
true },
+                 { true, false, false, false, true }, { true, false, false, 
true, true }, { true, false, true, false, true }, { true, false, true, true, 
true },
+                 { true, true, false, false, true }, { true, true, false, 
true, true }, { true, true, true, false, true }, { true, true, true, true, true 
}
            });
     }
     
@@ -307,6 +312,10 @@ public class IndexExtendedIT extends BaseTest {
             args.add("-runfg");
         }
 
+        if(useSnapshot) {
+            args.add("-snap");
+        }
+
         args.add("-op");
         args.add("/tmp/"+UUID.randomUUID().toString());
         return args.toArray(new String[0]);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6970b871/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 14f7b94..25729d6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -184,6 +184,13 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
             if (txnScnValue!=null) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
             }
+
+            // setting the snapshot configuration
+            String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+            if (snapshotName != null)
+                
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
+                    getQueryServices().getConfiguration(), snapshotName);
+
             // Initialize the query plan so it sets up the parallel scans
             queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
             return queryPlan;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6970b871/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index da216ed..671e4cf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
@@ -118,6 +120,8 @@ public class IndexTool extends Configured implements Tool {
                             + "If specified, runs index build in Foreground. 
Default - Runs the build in background.");
     private static final Option OUTPUT_PATH_OPTION = new Option("op", 
"output-path", true,
             "Output path where the files are written");
+    private static final Option SNAPSHOT_OPTION = new Option("snap", 
"snapshot", false,
+        "If specified, uses Snapshots for async index building (optional)");
     private static final Option HELP_OPTION = new Option("h", "help", false, 
"Help");
     public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
 
@@ -130,6 +134,7 @@ public class IndexTool extends Configured implements Tool {
         options.addOption(DIRECT_API_OPTION);
         options.addOption(RUN_FOREGROUND_OPTION);
         options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(SNAPSHOT_OPTION);
         options.addOption(HELP_OPTION);
         return options;
     }
@@ -203,11 +208,12 @@ public class IndexTool extends Configured implements Tool 
{
 
         }
 
-        public Job getJob(String schemaName, String indexTable, String 
dataTable, boolean useDirectApi, boolean isPartialBuild) throws Exception {
+        public Job getJob(String schemaName, String indexTable, String 
dataTable, boolean useDirectApi, boolean isPartialBuild,
+            boolean useSnapshot) throws Exception {
             if (isPartialBuild) {
                 return configureJobForPartialBuild(schemaName, dataTable);
             } else {
-                return configureJobForAysncIndex(schemaName, indexTable, 
dataTable, useDirectApi);
+                return configureJobForAysncIndex(schemaName, indexTable, 
dataTable, useDirectApi, useSnapshot);
             }
         }
         
@@ -320,7 +326,7 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAysncIndex(String schemaName, String 
indexTable, String dataTable, boolean useDirectApi)
+        private Job configureJobForAysncIndex(String schemaName, String 
indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot)
                 throws Exception {
             final String qDataTable = 
SchemaUtil.getQualifiedTableName(schemaName, dataTable);
             final String qIndexTable;
@@ -374,12 +380,34 @@ public class IndexTool extends Configured implements Tool 
{
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
             FileOutputFormat.setOutputPath(job, outputPath);
-            
-            PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, 
qDataTable,
-                selectQuery);
+
+            if (!useSnapshot) {
+                PhoenixMapReduceUtil.setInput(job, 
PhoenixIndexDBWritable.class, qDataTable,
+                    selectQuery);
+            } else {
+                HBaseAdmin admin = null;
+                String snapshotName;
+                try {
+                    admin = pConnection.getQueryServices().getAdmin();
+                    String pdataTableName = pdataTable.getName().getString();
+                    snapshotName = new 
StringBuilder(pdataTableName).append("-Snapshot").toString();
+                    admin.snapshot(snapshotName, 
TableName.valueOf(pdataTableName));
+                } finally {
+                    if (admin != null) {
+                        admin.close();
+                    }
+                }
+                // root dir not a subdirectory of hbase dir
+                Path rootDir = new Path("hdfs:///index-snapshot-dir");
+                FSUtils.setRootDir(configuration, rootDir);
+                Path restoreDir = new Path(FSUtils.getRootDir(configuration), 
"restore-dir");
+
+                // set input for map reduce job using hbase snapshots
+                PhoenixMapReduceUtil
+                    .setInput(job, PhoenixIndexDBWritable.class, snapshotName, 
qDataTable, restoreDir, selectQuery);
+            }
             TableMapReduceUtil.initCredentials(job);
             
-            
             if (useDirectApi) {
                 return configureSubmittableJobUsingDirectApi(job, false);
             } else {
@@ -464,6 +492,7 @@ public class IndexTool extends Configured implements Tool {
             boolean useDirectApi = 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String 
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
+            boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
             connection = ConnectionUtil.getInputConnection(configuration);
             byte[][] splitKeysBeforeJob = null;
             boolean isLocalIndexBuild = false;
@@ -494,7 +523,7 @@ public class IndexTool extends Configured implements Tool {
                        }
             
             Job job = new JobFactory(connection, configuration, 
outputPath).getJob(schemaName, indexTable, dataTable,
-                    useDirectApi, isPartialBuild);
+                    useDirectApi, isPartialBuild, useSnapshot);
             if (!isForeground && useDirectApi) {
                 LOG.info("Running Index Build in Background - Submit async and 
exit");
                 job.submit();

Reply via email to