Repository: phoenix Updated Branches: refs/heads/4.x-HBase-1.2 87982125f -> c4945e40a (forced update)
PHOENIX-3812 Use HBase snapshots in async index building M/R job (Akshita Malhotra) Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c4945e40 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c4945e40 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c4945e40 Branch: refs/heads/4.x-HBase-1.2 Commit: c4945e40a6c8ee02925b0e5f3d00cebae4cbd45d Parents: 8b504a7 Author: James Taylor <jamestay...@apache.org> Authored: Fri Jun 9 16:53:57 2017 -0700 Committer: James Taylor <jamestay...@apache.org> Committed: Fri Jun 9 17:01:02 2017 -0700 ---------------------------------------------------------------------- .../apache/phoenix/end2end/IndexExtendedIT.java | 23 +++++++--- .../phoenix/mapreduce/PhoenixInputFormat.java | 7 +++ .../phoenix/mapreduce/index/IndexTool.java | 45 ++++++++++++++++---- 3 files changed, 60 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c4945e40/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java index b79e557..53bf625 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexExtendedIT.java @@ -74,17 +74,19 @@ public class IndexExtendedIT extends BaseTest { private final boolean directApi; private final String tableDDLOptions; private final boolean mutable; + private final boolean useSnapshot; @AfterClass public static void doTeardown() throws Exception { tearDownMiniCluster(); } - public IndexExtendedIT(boolean transactional, boolean mutable, boolean localIndex, boolean directApi) { + public IndexExtendedIT(boolean transactional, boolean mutable, boolean localIndex, boolean directApi, boolean useSnapshot) { this.localIndex = localIndex; this.transactional = transactional; this.directApi = directApi; this.mutable = mutable; + this.useSnapshot = useSnapshot; StringBuilder optionBuilder = new StringBuilder(); if (!mutable) { optionBuilder.append(" IMMUTABLE_ROWS=true "); @@ -110,13 +112,16 @@ public class IndexExtendedIT extends BaseTest { .iterator())); } - @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, directApi = {3}") + @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, directApi = {3}, useSnapshot = {4}") public static Collection<Boolean[]> data() { - return Arrays.asList(new Boolean[][] { - { false, false, false, false }, { false, false, false, true }, { false, false, true, false }, { false, false, true, true }, - { false, true, false, false }, { false, true, false, true }, { false, true, true, false }, { false, true, true, true }, - { true, false, false, false }, { true, false, false, true }, { true, false, true, false }, { true, false, true, true }, - { true, true, false, false }, { true, true, false, true }, { true, true, true, false }, { true, true, true, true } + return Arrays.asList(new Boolean[][] { + { false, false, false, false, false }, { false, false, false, true, false }, { false, false, true, false, false }, { false, false, true, true, false }, + { false, true, false, false, false }, { false, true, false, true, false }, { false, true, true, false, false }, { false, true, true, true, false }, + { true, false, false, false, false }, { true, false, false, true, false }, { true, false, true, false, false }, { true, false, true, true, false }, + { true, true, false, false, false }, { true, true, false, true, false }, { true, true, true, false, false }, { true, true, true, true, false }, + { false, true, false, false, true }, { false, true, false, true, true }, { false, true, true, false, true }, { false, true, true, true, true }, + { true, false, false, false, true }, { true, false, false, true, true }, { true, false, true, false, true }, { true, false, true, true, true }, + { true, true, false, false, true }, { true, true, false, true, true }, { true, true, true, false, true }, { true, true, true, true, true } }); } @@ -307,6 +312,10 @@ public class IndexExtendedIT extends BaseTest { args.add("-runfg"); } + if(useSnapshot) { + args.add("-snap"); + } + args.add("-op"); args.add("/tmp/"+UUID.randomUUID().toString()); return args.toArray(new String[0]); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c4945e40/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 14f7b94..25729d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -184,6 +184,13 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr if (txnScnValue!=null) { scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue))); } + + // setting the snapshot configuration + String snapshotName = configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY); + if (snapshotName != null) + PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection(). + getQueryServices().getConfiguration(), snapshotName); + // Initialize the query plan so it sets up the parallel scans queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); return queryPlan; http://git-wip-us.apache.org/repos/asf/phoenix/blob/c4945e40/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index da216ed..671e4cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; @@ -55,6 +56,7 @@ import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; @@ -118,6 +120,8 @@ public class IndexTool extends Configured implements Tool { + "If specified, runs index build in Foreground. Default - Runs the build in background."); private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true, "Output path where the files are written"); + private static final Option SNAPSHOT_OPTION = new Option("snap", "snapshot", false, + "If specified, uses Snapshots for async index building (optional)"); private static final Option HELP_OPTION = new Option("h", "help", false, "Help"); public static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s"; @@ -130,6 +134,7 @@ public class IndexTool extends Configured implements Tool { options.addOption(DIRECT_API_OPTION); options.addOption(RUN_FOREGROUND_OPTION); options.addOption(OUTPUT_PATH_OPTION); + options.addOption(SNAPSHOT_OPTION); options.addOption(HELP_OPTION); return options; } @@ -203,11 +208,12 @@ public class IndexTool extends Configured implements Tool { } - public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild) throws Exception { + public Job getJob(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean isPartialBuild, + boolean useSnapshot) throws Exception { if (isPartialBuild) { return configureJobForPartialBuild(schemaName, dataTable); } else { - return configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi); + return configureJobForAysncIndex(schemaName, indexTable, dataTable, useDirectApi, useSnapshot); } } @@ -320,7 +326,7 @@ public class IndexTool extends Configured implements Tool { } - private Job configureJobForAysncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi) + private Job configureJobForAysncIndex(String schemaName, String indexTable, String dataTable, boolean useDirectApi, boolean useSnapshot) throws Exception { final String qDataTable = SchemaUtil.getQualifiedTableName(schemaName, dataTable); final String qIndexTable; @@ -374,12 +380,34 @@ public class IndexTool extends Configured implements Tool { job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); FileOutputFormat.setOutputPath(job, outputPath); - - PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, - selectQuery); + + if (!useSnapshot) { + PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, + selectQuery); + } else { + HBaseAdmin admin = null; + String snapshotName; + try { + admin = pConnection.getQueryServices().getAdmin(); + String pdataTableName = pdataTable.getName().getString(); + snapshotName = new StringBuilder(pdataTableName).append("-Snapshot").toString(); + admin.snapshot(snapshotName, TableName.valueOf(pdataTableName)); + } finally { + if (admin != null) { + admin.close(); + } + } + // root dir not a subdirectory of hbase dir + Path rootDir = new Path("hdfs:///index-snapshot-dir"); + FSUtils.setRootDir(configuration, rootDir); + Path restoreDir = new Path(FSUtils.getRootDir(configuration), "restore-dir"); + + // set input for map reduce job using hbase snapshots + PhoenixMapReduceUtil + .setInput(job, PhoenixIndexDBWritable.class, snapshotName, qDataTable, restoreDir, selectQuery); + } TableMapReduceUtil.initCredentials(job); - if (useDirectApi) { return configureSubmittableJobUsingDirectApi(job, false); } else { @@ -464,6 +492,7 @@ public class IndexTool extends Configured implements Tool { boolean useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); + boolean useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); connection = ConnectionUtil.getInputConnection(configuration); byte[][] splitKeysBeforeJob = null; boolean isLocalIndexBuild = false; @@ -494,7 +523,7 @@ public class IndexTool extends Configured implements Tool { } Job job = new JobFactory(connection, configuration, outputPath).getJob(schemaName, indexTable, dataTable, - useDirectApi, isPartialBuild); + useDirectApi, isPartialBuild, useSnapshot); if (!isForeground && useDirectApi) { LOG.info("Running Index Build in Background - Submit async and exit"); job.submit();