This is an automated email from the ASF dual-hosted git repository.

skadam pushed a commit to branch 4.x-HBase-1.5
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.x-HBase-1.5 by this push:
     new d55d391  PHOENIX-5590 IndexTool always runs with direct
d55d391 is described below

commit d55d391926f0df667d3b5b484ca0c56543f993a7
Author: Gokcen Iskender <gisken...@salesforce.com>
AuthorDate: Tue Nov 26 14:31:06 2019 -0800

    PHOENIX-5590 IndexTool always runs with direct
    
    Signed-off-by: s.kadam <s.ka...@apache.org>
---
 .../end2end/IndexToolForPartialBuildIT.java        |  1 +
 .../org/apache/phoenix/end2end/IndexToolIT.java    | 23 ++---
 .../coprocessor/tasks/IndexRebuildTask.java        |  2 +-
 .../apache/phoenix/mapreduce/index/IndexTool.java  | 97 +++++-----------------
 4 files changed, 30 insertions(+), 93 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
index 50515b1..3027554 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolForPartialBuildIT.java
@@ -237,6 +237,7 @@ public class IndexToolForPartialBuildIT extends 
BaseOwnClusterIT {
         // complete index rebuild
         args.add("-it");
         args.add(indexName);
+        args.add("-runfg");
         args.add("-op");
         args.add("/tmp/output/partialTable_");
         return args.toArray(new String[0]);
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 266ad57..ff1b354 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -38,11 +38,8 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -53,12 +50,10 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.phoenix.end2end.index.GlobalIndexCheckerIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
 import org.apache.phoenix.mapreduce.index.PhoenixIndexImportDirectMapper;
-import org.apache.phoenix.mapreduce.index.PhoenixIndexImportMapper;
 import org.apache.phoenix.mapreduce.index.PhoenixServerBuildIndexMapper;
 
 import org.apache.phoenix.query.QueryServices;
@@ -660,10 +655,9 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
         args.add(indxTable);
         if (directApi) {
             args.add("-direct");
-            // Need to run this job in foreground for the test to be 
deterministic
-            args.add("-runfg");
         }
-
+        // Need to run this job in foreground for the test to be deterministic
+        args.add("-runfg");
         if (useSnapshot) {
             args.add("-snap");
         }
@@ -704,15 +698,10 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
             boolean transactional = dataTable.isTransactional();
             boolean localIndex = 
PTable.IndexType.LOCAL.equals(indexTable.getIndexType());
 
-            if (directApi) {
-                if ((localIndex || !transactional) && !useSnapshot) {
-                    assertEquals(job.getMapperClass(), 
PhoenixServerBuildIndexMapper.class);
-                } else {
-                    assertEquals(job.getMapperClass(), 
PhoenixIndexImportDirectMapper.class);
-                }
-            }
-            else {
-                assertEquals(job.getMapperClass(), 
PhoenixIndexImportMapper.class);
+            if ((localIndex || !transactional) && !useSnapshot) {
+                assertEquals(job.getMapperClass(), 
PhoenixServerBuildIndexMapper.class);
+            } else {
+                assertEquals(job.getMapperClass(), 
PhoenixIndexImportDirectMapper.class);
             }
         }
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
index a943499..3bf9dfb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/tasks/IndexRebuildTask.java
@@ -95,7 +95,7 @@ public class IndexRebuildTask extends BaseTask  {
             // Run index tool async.
             boolean runForeground = false;
             Map.Entry<Integer, Job> indexToolRes = IndexTool
-                    .run(conf, taskRecord.getSchemaName(), 
taskRecord.getTableName(), indexName, true,
+                    .run(conf, taskRecord.getSchemaName(), 
taskRecord.getTableName(), indexName,
                             false, taskRecord.getTenantId(), shouldDisable, 
rebuildAll, runForeground);
             int status = indexToolRes.getKey();
             if (status != 0) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 864036c..9a589b9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -51,15 +51,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
@@ -124,7 +121,6 @@ public class IndexTool extends Configured implements Tool {
     private boolean isPartialBuild;
     private String qDataTable;
     private String qIndexTable;
-    private boolean useDirectApi;
     private boolean useSnapshot;
     private boolean isLocalIndexBuild;
     private boolean shouldDeleteBeforeRebuild;
@@ -146,7 +142,7 @@ public class IndexTool extends Configured implements Tool {
             "To build indexes for a data table from least disabledTimeStamp");
     
     private static final Option DIRECT_API_OPTION = new Option("direct", 
"direct", false,
-            "If specified, we avoid the bulk load (optional)");
+            "This parameter is deprecated. Direct mode will be used whether it 
is set or not. Keeping it for backwards compatibility.");
 
     private static final double DEFAULT_SPLIT_SAMPLING_RATE = 10.0;
 
@@ -232,27 +228,20 @@ public class IndexTool extends Configured implements Tool 
{
             throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " 
is a mandatory "
                     + "parameter");
         }
-
-               if (!(cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) || 
cmdLine.hasOption(DIRECT_API_OPTION.getOpt()))
-                               && 
!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
-                       throw new 
IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " + 
"parameter");
-               }
         
                if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && 
cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
                        throw new IllegalStateException("Index name should not 
be passed with " + PARTIAL_REBUILD_OPTION.getLongOpt());
                }
 
+        if (!cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && 
!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException("Index name should be passed 
unless it is a partial rebuild.");
+        }
+
                if (cmdLine.hasOption(PARTIAL_REBUILD_OPTION.getOpt()) && 
cmdLine.hasOption(DELETE_ALL_AND_REBUILD_OPTION.getOpt())) {
             throw new 
IllegalStateException(DELETE_ALL_AND_REBUILD_OPTION.getLongOpt() + " is not 
compatible with "
                     + PARTIAL_REBUILD_OPTION.getLongOpt());
         }
-                       
-        if (!(cmdLine.hasOption(DIRECT_API_OPTION.getOpt())) && 
cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())
-                && cmdLine.hasOption(RUN_FOREGROUND_OPTION
-                        .getOpt())) {
-            throw new IllegalStateException(RUN_FOREGROUND_OPTION.getLongOpt()
-                    + " is applicable only for " + 
DIRECT_API_OPTION.getLongOpt());
-        }
+
         boolean splitIndex = 
cmdLine.hasOption(AUTO_SPLIT_INDEX_OPTION.getOpt()) || 
cmdLine.hasOption(SPLIT_INDEX_OPTION.getOpt());
         if (splitIndex && !cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
             throw new IllegalStateException("Must pass an index name for the 
split index option");
@@ -304,10 +293,10 @@ public class IndexTool extends Configured implements Tool 
{
                             
Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
                     configuration.set(PhoenixConfigurationUtil.TX_PROVIDER, 
pDataTable.getTransactionProvider().name());
                 }
-                if (useSnapshot || !useDirectApi || (!isLocalIndexBuild && 
pDataTable.isTransactional())) {
+                if (useSnapshot || (!isLocalIndexBuild && 
pDataTable.isTransactional())) {
                     
configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
                             Long.toString(maxTimeRange));
-                    return configureJobForAysncIndex();
+                    return configureJobForAsyncIndex();
                 } else {
                     // Local and non-transactional global indexes to be built 
on the server side
                     // It is safe not to set CURRENT_SCN_VALUE for server side 
rebuilds, in order to make sure that
@@ -428,10 +417,7 @@ public class IndexTool extends Configured implements Tool {
             
         }
 
-        private Job configureJobForAysncIndex()
-
-                throws Exception {
-
+        private Job configureJobForAsyncIndex() throws Exception {
             String physicalIndexTable = 
pIndexTable.getPhysicalName().getString();
             final PhoenixConnection pConnection = 
connection.unwrap(PhoenixConnection.class);
             final PostIndexDDLCompiler ddlCompiler =
@@ -455,13 +441,17 @@ public class IndexTool extends Configured implements Tool 
{
                     PhoenixRuntime.generateColumnInfo(connection, qIndexTable, 
indexColumns);
             ColumnInfoToStringEncoderDecoder.encode(configuration, 
columnMetadataList);
 
-            fs = outputPath.getFileSystem(configuration);
-            fs.delete(outputPath, true);
+            if (outputPath != null) {
+                fs = outputPath.getFileSystem(configuration);
+                fs.delete(outputPath, true);
+            }
             final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE, 
schemaName, dataTable, indexTable);
             final Job job = Job.getInstance(configuration, jobName);
             job.setJarByClass(IndexTool.class);
             job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            FileOutputFormat.setOutputPath(job, outputPath);
+            if (outputPath != null) {
+                FileOutputFormat.setOutputPath(job, outputPath);
+            }
 
             if (!useSnapshot) {
                 PhoenixMapReduceUtil.setInput(job, 
PhoenixIndexDBWritable.class, qDataTable, selectQuery);
@@ -489,16 +479,11 @@ public class IndexTool extends Configured implements Tool 
{
             }
             TableMapReduceUtil.initCredentials(job);
             
-            if (useDirectApi) {
-                job.setMapperClass(PhoenixIndexImportDirectMapper.class);
-                return configureSubmittableJobUsingDirectApi(job);
-            } else {
-                return configureRunnableJobUsingBulkLoad(job, outputPath);
-            }
+            job.setMapperClass(PhoenixIndexImportDirectMapper.class);
+            return configureSubmittableJobUsingDirectApi(job);
         }
 
-        private Job configureJobForServerBuildIndex()
-                throws Exception {
+        private Job configureJobForServerBuildIndex() throws Exception {
             long indexRebuildQueryTimeoutMs =
                     
configuration.getLong(QueryServices.INDEX_REBUILD_QUERY_TIMEOUT_ATTRIB,
                             
QueryServicesOptions.DEFAULT_INDEX_REBUILD_QUERY_TIMEOUT);
@@ -554,26 +539,6 @@ public class IndexTool extends Configured implements Tool {
         }
 
         /**
-         * Submits the job and waits for completion.
-         * @param job
-         * @param outputPath
-         * @return
-         * @throws Exception
-         */
-        private Job configureRunnableJobUsingBulkLoad(Job job, Path 
outputPath) throws Exception {
-            job.setMapperClass(PhoenixIndexImportMapper.class);
-            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-            job.setMapOutputValueClass(KeyValue.class);
-            final Configuration configuration = job.getConfiguration();
-            final String physicalIndexTable =
-                    
PhoenixConfigurationUtil.getPhysicalTableName(configuration);
-            try(final HTable htable = new HTable(configuration, 
physicalIndexTable)) {
-                HFileOutputFormat.configureIncrementalLoad(job, htable);
-            }
-            return job;
-        }
-        
-        /**
          * Uses the HBase Front Door Api to write to index table. Submits the 
job and either returns or
          * waits for the job completion based on runForeground parameter.
          * 
@@ -581,9 +546,7 @@ public class IndexTool extends Configured implements Tool {
          * @return
          * @throws Exception
          */
-        private Job configureSubmittableJobUsingDirectApi(Job job)
-                throws Exception {
-
+        private Job configureSubmittableJobUsingDirectApi(Job job) throws 
Exception {
             job.setReducerClass(PhoenixIndexImportDirectReducer.class);
             Configuration conf = job.getConfiguration();
             HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
@@ -633,7 +596,6 @@ public class IndexTool extends Configured implements Tool {
             try(Connection tempConn = 
ConnectionUtil.getInputConnection(configuration)) {
                 pDataTable = PhoenixRuntime.getTableNoCache(tempConn, 
qDataTable);
             }
-            useDirectApi = cmdLine.hasOption(DIRECT_API_OPTION.getOpt());
             String 
basePath=cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt());
             boolean isForeground = 
cmdLine.hasOption(RUN_FOREGROUND_OPTION.getOpt());
             useSnapshot = cmdLine.hasOption(SNAPSHOT_OPTION.getOpt());
@@ -697,7 +659,7 @@ public class IndexTool extends Configured implements Tool {
             jobFactory = new JobFactory(connection, configuration, outputPath);
             job = jobFactory.getJob();
 
-            if (!isForeground && useDirectApi) {
+            if (!isForeground) {
                 LOGGER.info("Running Index Build in Background - Submit async 
and exit");
                 job.submit();
                 return 0;
@@ -707,18 +669,6 @@ public class IndexTool extends Configured implements Tool {
             boolean result = job.waitForCompletion(true);
             
             if (result) {
-                if (!useDirectApi && indexTable != null) {
-                    if (isLocalIndexBuild) {
-                        validateSplitForLocalIndex(splitKeysBeforeJob, htable);
-                    }
-                    LOGGER.info("Loading HFiles from {}", outputPath);
-                    LoadIncrementalHFiles loader = new 
LoadIncrementalHFiles(configuration);
-                    loader.doBulkLoad(outputPath, htable);
-                    htable.close();
-                    // Without direct API, we need to update the index state 
to ACTIVE from client.
-                    IndexToolUtil.updateIndexState(connection, qDataTable, 
indexTable, PIndexState.ACTIVE);
-                    fs.delete(outputPath, true);
-                }
                 return 0;
             } else {
                 LOGGER.error("IndexTool job failed! Check logs for errors..");
@@ -919,7 +869,7 @@ public class IndexTool extends Configured implements Tool {
     }
 
     public static Map.Entry<Integer, Job> run(Configuration conf, String 
schemaName, String dataTable, String indexTable,
-            boolean directApi, boolean useSnapshot, String tenantId, boolean 
disableBefore, boolean shouldDeleteBeforeRebuild, boolean runForeground) throws 
Exception {
+             boolean useSnapshot, String tenantId, boolean disableBefore, 
boolean shouldDeleteBeforeRebuild, boolean runForeground) throws Exception {
         final List<String> args = Lists.newArrayList();
         if (schemaName != null) {
             args.add("-s");
@@ -929,9 +879,6 @@ public class IndexTool extends Configured implements Tool {
         args.add(dataTable);
         args.add("-it");
         args.add(indexTable);
-        if (directApi) {
-            args.add("-direct");
-        }
 
         if (runForeground) {
             args.add("-runfg");

Reply via email to