This is an automated email from the ASF dual-hosted git repository. skadam pushed a commit to branch 4.x-HBase-1.5 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push: new d55d391 PHOENIX-5590 IndexTool always runs with direct d55d391 is described below commit d55d391926f0df667d3b5b484ca0c56543f993a7 Author: Gokcen Iskender <gisken...@salesforce.com> AuthorDate: Tue Nov 26 14:31:06 2019 -0800 PHOENIX-5590 IndexTool always runs with direct Signed-off-by: s.kadam <s.ka...@apache.org> --- .../end2end/IndexToolForPartialBuildIT.java | 1 + .../org/apache/phoenix/end2end/IndexToolIT.java | 23 ++--- .../coprocessor/tasks/IndexRebuildTask.java | 2 +- .../apache/phoenix/mapreduce/index/IndexTool.java | 97 +++++----------------- 4 files changed, 30 insertions(+), 93 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java index 50515b1..3027554 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java @@ -237,6 +237,7 @@ public class IndexToolForPartialBuildIT extends BaseOwnClusterIT { // complete index rebuild args.add("-it"); args.add(indexName); + args.add("-runfg"); args.add("-op"); args.add("/tmp/output/partialTable_"); return args.toArray(new String[0]); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java index 266ad57..ff1b354 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java @@ -38,11 +38,8 @@ import java.util.Map; import java.util.Properties; import java.util.UUID; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -53,12 +50,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.mapreduce.Job; -import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper; -import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper; import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper; import org.apache.phoenix.query.QueryServices; @@ -660,10 +655,9 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { args.add(indxTable); if (directApi) { args.add("-direct"); - // Need to run this job in foreground for the test to be deterministic - args.add("-runfg"); } - + // Need to run this job in foreground for the test to be deterministic + args.add("-runfg"); if (useSnapshot) { args.add("-snap"); } @@ -704,15 +698,10 @@ public class IndexToolIT extends BaseUniqueNamesOwnClusterIT { boolean transactional = dataTable.isTransactional(); boolean localIndex = PTable.IndexType.LOCAL.equals(indexTable.getIndexType()); - if (directApi) { - if ((localIndex || !transactional) && !useSnapshot) { - assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class); - } else { - assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class); - } - } - else { - assertEquals(job.getMapperClass(), PhoenixIndexImportMapper.class); + if ((localIndex || !transactional) && !useSnapshot) { + assertEquals(job.getMapperClass(), PhoenixServerBuildIndexMapper.class); + } else { + assertEquals(job.getMapperClass(), PhoenixIndexImportDirectMapper.class); } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java index a943499..3bf9dfb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java @@ -95,7 +95,7 @@ public class IndexRebuildTask extends BaseTask { // Run index tool async. boolean runForeground = false; Map.Entry<Integer, Job> indexToolRes = IndexTool - .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName, true, + .run(conf, taskRecord.getSchemaName(), taskRecord.getTableName(), indexName, false, taskRecord.getTenantId(), shouldDisable, rebuildAll, runForeground); int status = indexToolRes.getKey(); if (status != 0) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 864036c..9a589b9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -51,15 +51,12 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; -import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; @@ -124,7 +121,6 @@ public class IndexTool extends Configured implements Tool { private boolean isPartialBuild; private String qDataTable; private String qIndexTable; - private boolean useDirectApi; private boolean useSnapshot; private boolean isLocalIndexBuild; private boolean shouldDeleteBeforeRebuild; @@ -146,7 +142,7 @@ public class IndexTool extends Configured implements Tool { "To build indexes for a data table from least disabledTimeStamp"); private static final Option DIRECT_API_OPTION = new Option("direct", "direct", false, - "If specified, we avoid the bulk load (optional)"); + "This parameter is deprecated. Direct mode will be used whether it is set or not. Keeping it for backwards compatibility."); private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0; @@ -232,27 +228,20 @@ public class IndexTool extends Configured implements Tool { throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory " + "parameter"); } - - if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) - && !cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) { - throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + "parameter"); - } if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { throw new IllegalStateException("Index name should not be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt()); } + if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { + throw new IllegalStateException("Index name should be passed unless it is a partial rebuild."); + } + if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) { throw new IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt() + " is not compatible with " + PARTIAL_REBUILD_OPTION.getLongOpt()); } - - if (!(cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) && cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt()) - && cmdLine.hasOption(RUN_FOREGROUND_OPTION - .getOpt())) { - throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt() - + " is applicable only for " + DIRECT_API_OPTION.getLongOpt()); - } + boolean splitIndex = cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()) || cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt()); if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) { throw new IllegalStateException("Must pass an index name for the split index option"); @@ -304,10 +293,10 @@ public class IndexTool extends Configured implements Tool { Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange))); configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, pDataTable.getTransactionProvider().name()); } - if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && pDataTable.isTransactional())) { + if (useSnapshot || (!isLocalIndexBuild && pDataTable.isTransactional())) { configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(maxTimeRange)); - return configureJobForAysncIndex(); + return configureJobForAsyncIndex(); } else { // Local and non-transactional global indexes to be built on the server side // It is safe not to set CURRENT_SCN_VALUE for server side rebuilds, in order to make sure that @@ -428,10 +417,7 @@ public class IndexTool extends Configured implements Tool { } - private Job configureJobForAysncIndex() - - throws Exception { - + private Job configureJobForAsyncIndex() throws Exception { String physicalIndexTable = pIndexTable.getPhysicalName().getString(); final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class); final PostIndexDDLCompiler ddlCompiler = @@ -455,13 +441,17 @@ public class IndexTool extends Configured implements Tool { PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns); ColumnInfoToStringEncoderDecoder.encode(configuration, columnMetadataList); - fs = outputPath.getFileSystem(configuration); - fs.delete(outputPath, true); + if (outputPath != null) { + fs = outputPath.getFileSystem(configuration); + fs.delete(outputPath, true); + } final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, schemaName, dataTable, indexTable); final Job job = Job.getInstance(configuration, jobName); job.setJarByClass(IndexTool.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); - FileOutputFormat.setOutputPath(job, outputPath); + if (outputPath != null) { + FileOutputFormat.setOutputPath(job, outputPath); + } if (!useSnapshot) { PhoenixMapReduceUtil.setInput(job, PhoenixIndexDBWritable.class, qDataTable, selectQuery); @@ -489,16 +479,11 @@ public class IndexTool extends Configured implements Tool { } TableMapReduceUtil.initCredentials(job); - if (useDirectApi) { - job.setMapperClass(PhoenixIndexImportDirectMapper.class); - return configureSubmittableJobUsingDirectApi(job); - } else { - return configureRunnableJobUsingBulkLoad(job, outputPath); - } + job.setMapperClass(PhoenixIndexImportDirectMapper.class); + return configureSubmittableJobUsingDirectApi(job); } - private Job configureJobForServerBuildIndex() - throws Exception { + private Job configureJobForServerBuildIndex() throws Exception { long indexRebuildQueryTimeoutMs = configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT); @@ -554,26 +539,6 @@ public class IndexTool extends Configured implements Tool { } /** - * Submits the job and waits for completion. - * @param job - * @param outputPath - * @return - * @throws Exception - */ - private Job configureRunnableJobUsingBulkLoad(Job job, Path outputPath) throws Exception { - job.setMapperClass(PhoenixIndexImportMapper.class); - job.setMapOutputKeyClass(ImmutableBytesWritable.class); - job.setMapOutputValueClass(KeyValue.class); - final Configuration configuration = job.getConfiguration(); - final String physicalIndexTable = - PhoenixConfigurationUtil.getPhysicalTableName(configuration); - try(final HTable htable = new HTable(configuration, physicalIndexTable)) { - HFileOutputFormat.configureIncrementalLoad(job, htable); - } - return job; - } - - /** * Uses the HBase Front Door Api to write to index table. Submits the job and either returns or * waits for the job completion based on runForeground parameter. * @@ -581,9 +546,7 @@ public class IndexTool extends Configured implements Tool { * @return * @throws Exception */ - private Job configureSubmittableJobUsingDirectApi(Job job) - throws Exception { - + private Job configureSubmittableJobUsingDirectApi(Job job) throws Exception { job.setReducerClass(PhoenixIndexImportDirectReducer.class); Configuration conf = job.getConfiguration(); HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf)); @@ -633,7 +596,6 @@ public class IndexTool extends Configured implements Tool { try(Connection tempConn = ConnectionUtil.getInputConnection(configuration)) { pDataTable = PhoenixRuntime.getTableNoCache(tempConn, qDataTable); } - useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt()); String basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()); boolean isForeground = cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt()); useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt()); @@ -697,7 +659,7 @@ public class IndexTool extends Configured implements Tool { jobFactory = new JobFactory(connection, configuration, outputPath); job = jobFactory.getJob(); - if (!isForeground && useDirectApi) { + if (!isForeground) { LOGGER.info("Running Index Build in Background - Submit async and exit"); job.submit(); return 0; @@ -707,18 +669,6 @@ public class IndexTool extends Configured implements Tool { boolean result = job.waitForCompletion(true); if (result) { - if (!useDirectApi && indexTable != null) { - if (isLocalIndexBuild) { - validateSplitForLocalIndex(splitKeysBeforeJob, htable); - } - LOGGER.info("Loading HFiles from {}", outputPath); - LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration); - loader.doBulkLoad(outputPath, htable); - htable.close(); - // Without direct API, we need to update the index state to ACTIVE from client. - IndexToolUtil.updateIndexState(connection, qDataTable, indexTable, PIndexState.ACTIVE); - fs.delete(outputPath, true); - } return 0; } else { LOGGER.error("IndexTool job failed! Check logs for errors.."); @@ -919,7 +869,7 @@ public class IndexTool extends Configured implements Tool { } public static Map.Entry<Integer, Job> run(Configuration conf, String schemaName, String dataTable, String indexTable, - boolean directApi, boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception { + boolean useSnapshot, String tenantId, boolean disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception { final List<String> args = Lists.newArrayList(); if (schemaName != null) { args.add("-s"); @@ -929,9 +879,6 @@ public class IndexTool extends Configured implements Tool { args.add(dataTable); args.add("-it"); args.add(indexTable); - if (directApi) { - args.add("-direct"); - } if (runForeground) { args.add("-runfg");