[ 
https://issues.apache.org/jira/browse/PHOENIX-6698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17536602#comment-17536602
 ] 

ASF GitHub Bot commented on PHOENIX-6698:
-----------------------------------------

chrajeshbabu commented on code in PR #79:
URL: https://github.com/apache/phoenix-connectors/pull/79#discussion_r872323611


##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)

Review Comment:
   Remove unnecessary parentheses. 



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");

Review Comment:
   Make it clear that Generate Input Splits in serial.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(
+                                jobConf,
+                                qplan,
+                                splits,
+                                query,
+                                scans,
+                                splitByStats,
+                                tablePaths)
+                );
+            }
+        } else {
+            final int parallism = jobConf.getInt(
+                    "hive.phoenix.split.parallel.level",
+                    Runtime.getRuntime().availableProcessors() * 2);
+            ExecutorService executorService = Executors.newFixedThreadPool(
+                    parallism);
+            LOG.info("generate splits in parallel with {} threads", parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
 {
-        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
+            List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-        for (List<Scan> scans : qplan.getScans()) {
+            try {
+                for (final List<Scan> scans : qplan.getScans()) {
+                    Future<List<InputSplit>> task = executorService.submit(
+                            new Callable<List<InputSplit>>() {
+                        @Override
+                        public List<InputSplit> call() throws Exception {
+                            return generateSplitsInternal(jobConf,
+                                    qplan,
+                                    splits,
+                                    query,
+                                    scans,
+                                    splitByStats,
+                                    tablePaths);
+                        }
+                    });
+                    tasks.add(task);
+                }
+                for (Future<List<InputSplit>> task : tasks) {
+                    psplits.addAll(task.get());
+                }
+            } catch (ExecutionException | InterruptedException exception) {
+                throw new IOException("failed to get splits,reason:",
+                        exception);
+            } finally {
+                executorService.shutdown();
+            }
+        }
+        return psplits;
+    }
+    /**
+     * This method is used to generate splits for each scan list.
+     * @param jobConf MapReduce Job Configuration
+     * @param qplan phoenix query plan
+     * @param splits phoenix table splits
+     * @param query phoenix query statement
+     * @param scans scan list slice of query plan
+     * @param splitByStats split by stat enabled
+     * @param tablePaths table paths
+     * @return List of Input Splits
+     * @throws IOException if function fails
+     */
+    private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+            final QueryPlan qplan,
+            final List<KeyRange> splits,
+            final String query,
+            final List<Scan> scans,
+            final boolean splitByStats,
+            final Path[] tablePaths) throws IOException {
+
+        final List<InputSplit> psplits = new ArrayList<>(scans.size());
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                ConnectionFactory.createConnection(
+                PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            RegionLocator regionLocator =
+                    connection.getRegionLocator(TableName.valueOf(
+                            qplan.getTableRef().getTable()
+                                    .getPhysicalName().toString()));
             PhoenixInputSplit inputSplit;
 
-            HRegionLocation location = 
regionLocator.getRegionLocation(scans.get(0).getStartRow()
-                    , false);
-            long regionSize = CompatUtil.getSize(regionLocator, 
connection.getAdmin(), location);
-            String regionLocation = 
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+            HRegionLocation location = regionLocator.getRegionLocation(
+                    scans.get(0).getStartRow(),
+                    false);
+            long regionSize = CompatUtil.getSize(regionLocator,
+                    connection.getAdmin(),
+                    location);
+            String regionLocation =
+                    PhoenixStorageHandlerUtil.getRegionLocation(location,
+                    LOG);
 
             if (splitByStats) {
                 for (Scan aScan : scans) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
-                                .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : [" +
-                                aScan.getCaching() + ", " + 
aScan.getCacheBlocks() + ", " + aScan
-                                .getBatch() + "] and  regionLocation : " + 
regionLocation);
+                        LOG.debug("Split for  scan : "

Review Comment:
   Format the code properly.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(
+                                jobConf,
+                                qplan,
+                                splits,
+                                query,
+                                scans,
+                                splitByStats,
+                                tablePaths)
+                );
+            }
+        } else {
+            final int parallism = jobConf.getInt(
+                    "hive.phoenix.split.parallel.level",
+                    Runtime.getRuntime().availableProcessors() * 2);
+            ExecutorService executorService = Executors.newFixedThreadPool(
+                    parallism);
+            LOG.info("generate splits in parallel with {} threads", parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
 {
-        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
+            List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-        for (List<Scan> scans : qplan.getScans()) {
+            try {
+                for (final List<Scan> scans : qplan.getScans()) {
+                    Future<List<InputSplit>> task = executorService.submit(
+                            new Callable<List<InputSplit>>() {
+                        @Override
+                        public List<InputSplit> call() throws Exception {
+                            return generateSplitsInternal(jobConf,
+                                    qplan,
+                                    splits,
+                                    query,
+                                    scans,
+                                    splitByStats,
+                                    tablePaths);
+                        }
+                    });
+                    tasks.add(task);
+                }
+                for (Future<List<InputSplit>> task : tasks) {
+                    psplits.addAll(task.get());
+                }
+            } catch (ExecutionException | InterruptedException exception) {
+                throw new IOException("failed to get splits,reason:",

Review Comment:
   Log message can be improved.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(

Review Comment:
   Reduce the number of lines used to call the method.



##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+    private static final Logger LOG = LoggerFactory.getLogger(
+            HivePhoenixInputFormatTest.class);
+    private static final String TABLE_NAME = "HivePhoenixInputFormatTest"
+            .toUpperCase(Locale.ROOT);
+    private static final String DDL = "CREATE TABLE "
+            + TABLE_NAME
+            + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+    private static final int SPLITS = 128;
+
+    /*
+    *
+    *  This test will create phoenix table with 128 splits and compare
+    *  performance of split generation in serial/parallel
+    *
+    * */
+    @Test
+    public void testGetSplitsSerialOrParallel() throws 
IOException,SQLException {
+        PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+                new PhoenixInputFormat<PhoenixRecordWritable>();
+        long start,end;
+
+        // create table with N splits
+        System.out.println(
+                String.format("generate testing table with %s splits",
+                        String.valueOf(SPLITS)));
+        setupTestTable();
+        // setup configuration required for PhoenixInputFormat
+        Configuration conf = getUtility().getConfiguration();
+        JobConf jobConf = new JobConf(conf);
+        configureTestInput(jobConf);
+
+
+        // test get splits in serial
+        start = System.currentTimeMillis();
+        jobConf.set("hive.phoenix.split.parallel.threshold","0");
+        InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS);
+        end = System.currentTimeMillis();
+        long durationInSerial=end - start;
+        System.out.println(String.format(

Review Comment:
   We need to use assertions and no use in printing those in the logs.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(
+                                jobConf,
+                                qplan,
+                                splits,
+                                query,
+                                scans,
+                                splitByStats,
+                                tablePaths)
+                );
+            }
+        } else {
+            final int parallism = jobConf.getInt(
+                    "hive.phoenix.split.parallel.level",
+                    Runtime.getRuntime().availableProcessors() * 2);
+            ExecutorService executorService = Executors.newFixedThreadPool(
+                    parallism);
+            LOG.info("generate splits in parallel with {} threads", parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
 {
-        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
+            List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-        for (List<Scan> scans : qplan.getScans()) {
+            try {
+                for (final List<Scan> scans : qplan.getScans()) {
+                    Future<List<InputSplit>> task = executorService.submit(
+                            new Callable<List<InputSplit>>() {
+                        @Override
+                        public List<InputSplit> call() throws Exception {
+                            return generateSplitsInternal(jobConf,
+                                    qplan,
+                                    splits,
+                                    query,
+                                    scans,
+                                    splitByStats,
+                                    tablePaths);
+                        }
+                    });
+                    tasks.add(task);
+                }
+                for (Future<List<InputSplit>> task : tasks) {
+                    psplits.addAll(task.get());
+                }
+            } catch (ExecutionException | InterruptedException exception) {
+                throw new IOException("failed to get splits,reason:",
+                        exception);
+            } finally {
+                executorService.shutdown();
+            }
+        }
+        return psplits;
+    }
+    /**
+     * This method is used to generate splits for each scan list.
+     * @param jobConf MapReduce Job Configuration
+     * @param qplan phoenix query plan
+     * @param splits phoenix table splits
+     * @param query phoenix query statement
+     * @param scans scan list slice of query plan
+     * @param splitByStats split by stat enabled
+     * @param tablePaths table paths
+     * @return List of Input Splits
+     * @throws IOException if function fails
+     */
+    private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+            final QueryPlan qplan,
+            final List<KeyRange> splits,
+            final String query,
+            final List<Scan> scans,
+            final boolean splitByStats,
+            final Path[] tablePaths) throws IOException {
+
+        final List<InputSplit> psplits = new ArrayList<>(scans.size());
+        try (org.apache.hadoop.hbase.client.Connection connection =

Review Comment:
   The connection creation can be shared and reuse when generating the 
inputsplit.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(
+                                jobConf,
+                                qplan,
+                                splits,
+                                query,
+                                scans,
+                                splitByStats,
+                                tablePaths)
+                );
+            }
+        } else {
+            final int parallism = jobConf.getInt(
+                    "hive.phoenix.split.parallel.level",
+                    Runtime.getRuntime().availableProcessors() * 2);
+            ExecutorService executorService = Executors.newFixedThreadPool(
+                    parallism);
+            LOG.info("generate splits in parallel with {} threads", parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
 {
-        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
+            List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-        for (List<Scan> scans : qplan.getScans()) {
+            try {
+                for (final List<Scan> scans : qplan.getScans()) {
+                    Future<List<InputSplit>> task = executorService.submit(
+                            new Callable<List<InputSplit>>() {
+                        @Override
+                        public List<InputSplit> call() throws Exception {
+                            return generateSplitsInternal(jobConf,
+                                    qplan,
+                                    splits,
+                                    query,
+                                    scans,
+                                    splitByStats,
+                                    tablePaths);
+                        }
+                    });
+                    tasks.add(task);
+                }
+                for (Future<List<InputSplit>> task : tasks) {
+                    psplits.addAll(task.get());
+                }
+            } catch (ExecutionException | InterruptedException exception) {
+                throw new IOException("failed to get splits,reason:",
+                        exception);
+            } finally {
+                executorService.shutdown();
+            }
+        }
+        return psplits;
+    }
+    /**
+     * This method is used to generate splits for each scan list.
+     * @param jobConf MapReduce Job Configuration
+     * @param qplan phoenix query plan
+     * @param splits phoenix table splits
+     * @param query phoenix query statement
+     * @param scans scan list slice of query plan
+     * @param splitByStats split by stat enabled
+     * @param tablePaths table paths
+     * @return List of Input Splits
+     * @throws IOException if function fails
+     */
+    private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+            final QueryPlan qplan,
+            final List<KeyRange> splits,
+            final String query,
+            final List<Scan> scans,
+            final boolean splitByStats,
+            final Path[] tablePaths) throws IOException {
+
+        final List<InputSplit> psplits = new ArrayList<>(scans.size());
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                ConnectionFactory.createConnection(
+                PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            RegionLocator regionLocator =
+                    connection.getRegionLocator(TableName.valueOf(

Review Comment:
   Region locator also can be shared for each call.



##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+    private static final Logger LOG = LoggerFactory.getLogger(
+            HivePhoenixInputFormatTest.class);
+    private static final String TABLE_NAME = "HivePhoenixInputFormatTest"
+            .toUpperCase(Locale.ROOT);
+    private static final String DDL = "CREATE TABLE "
+            + TABLE_NAME
+            + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+    private static final int SPLITS = 128;
+
+    /*
+    *
+    *  This test will create phoenix table with 128 splits and compare
+    *  performance of split generation in serial/parallel
+    *
+    * */
+    @Test
+    public void testGetSplitsSerialOrParallel() throws 
IOException,SQLException {
+        PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+                new PhoenixInputFormat<PhoenixRecordWritable>();
+        long start,end;
+
+        // create table with N splits
+        System.out.println(
+                String.format("generate testing table with %s splits",
+                        String.valueOf(SPLITS)));
+        setupTestTable();
+        // setup configuration required for PhoenixInputFormat
+        Configuration conf = getUtility().getConfiguration();
+        JobConf jobConf = new JobConf(conf);
+        configureTestInput(jobConf);
+
+
+        // test get splits in serial
+        start = System.currentTimeMillis();
+        jobConf.set("hive.phoenix.split.parallel.threshold","0");
+        InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS);
+        end = System.currentTimeMillis();
+        long durationInSerial=end - start;
+        System.out.println(String.format(
+                "get split in serial requires:%s ms",
+                String.valueOf(durationInSerial)));
+
+        // test get splits in parallel
+        start = System.currentTimeMillis();
+        jobConf.set("hive.phoenix.split.parallel.threshold", "1");
+        InputSplit[] inputSplitsParallel = inputFormat.getSplits(
+                jobConf,
+                SPLITS);
+        end = System.currentTimeMillis();
+        long durationInParallel=end - start;
+
+        System.out.println(String.format(
+                "get split in parallel requires:%s ms",
+                String.valueOf(durationInParallel)));
+
+        // Test if performance of parallel method is better than serial method
+        Assert.assertTrue(durationInParallel < durationInSerial);
+        // Test if the input split returned by serial method and parallel 
method are the same
+        Assert.assertTrue(inputSplitsParallel.length==SPLITS);
+        Assert.assertTrue(
+                inputSplitsParallel.length == inputSplitsSerial.length
+        );
+        for (final InputSplit inputSplitParallel:inputSplitsParallel){
+            boolean match=false;
+            for (final InputSplit inputSplitSerial:inputSplitsSerial){
+                if (inputSplitParallel.equals(inputSplitSerial)){
+                    match=true;
+                    break;
+                }
+            }
+            Assert.assertTrue(match);
+        }
+    }
+
+    private static void setupTestTable() throws SQLException {
+        final byte [] start=new byte[0];
+        final byte [] end = Bytes.createMaxByteArray(4);
+        final byte[][] splits = Bytes.split(start, end, SPLITS-2);
+        createTestTableWithBinarySplit(getUrl(),DDL, splits ,null);
+    }
+
+    private static void buildPreparedSqlWithBinarySplits(
+            StringBuffer sb,
+            int splits)
+    {

Review Comment:
   code formatting required.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(
+                                jobConf,
+                                qplan,
+                                splits,
+                                query,
+                                scans,
+                                splitByStats,
+                                tablePaths)
+                );
+            }
+        } else {
+            final int parallism = jobConf.getInt(

Review Comment:
   Whats the difference between this parallelism level config and parallel 
threshold.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
                                             final List<KeyRange> splits, 
String query) throws
             IOException {
-        if (qplan == null){
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshould = jobConf.getInt(
+                "hive.phoenix.split.parallel.threshold",
+                32);
         setScanCacheSize(jobConf);
+        if (
+                (parallelThreshould <= 0)
+                ||
+                (qplan.getScans().size() < parallelThreshould)
+        ) {
+            LOG.info("generate splits in serial");
+            for (final List<Scan> scans : qplan.getScans()) {
+                psplits.addAll(
+                        generateSplitsInternal(
+                                jobConf,
+                                qplan,
+                                splits,
+                                query,
+                                scans,
+                                splitByStats,
+                                tablePaths)
+                );
+            }
+        } else {
+            final int parallism = jobConf.getInt(
+                    "hive.phoenix.split.parallel.level",
+                    Runtime.getRuntime().availableProcessors() * 2);
+            ExecutorService executorService = Executors.newFixedThreadPool(
+                    parallism);
+            LOG.info("generate splits in parallel with {} threads", parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
 {
-        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
+            List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-        for (List<Scan> scans : qplan.getScans()) {
+            try {
+                for (final List<Scan> scans : qplan.getScans()) {
+                    Future<List<InputSplit>> task = executorService.submit(
+                            new Callable<List<InputSplit>>() {
+                        @Override
+                        public List<InputSplit> call() throws Exception {
+                            return generateSplitsInternal(jobConf,
+                                    qplan,
+                                    splits,
+                                    query,
+                                    scans,
+                                    splitByStats,
+                                    tablePaths);
+                        }
+                    });
+                    tasks.add(task);
+                }
+                for (Future<List<InputSplit>> task : tasks) {
+                    psplits.addAll(task.get());
+                }
+            } catch (ExecutionException | InterruptedException exception) {
+                throw new IOException("failed to get splits,reason:",
+                        exception);
+            } finally {
+                executorService.shutdown();
+            }
+        }
+        return psplits;
+    }
+    /**
+     * This method is used to generate splits for each scan list.
+     * @param jobConf MapReduce Job Configuration
+     * @param qplan phoenix query plan
+     * @param splits phoenix table splits
+     * @param query phoenix query statement
+     * @param scans scan list slice of query plan
+     * @param splitByStats split by stat enabled
+     * @param tablePaths table paths
+     * @return List of Input Splits
+     * @throws IOException if function fails
+     */
+    private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+            final QueryPlan qplan,
+            final List<KeyRange> splits,
+            final String query,
+            final List<Scan> scans,
+            final boolean splitByStats,
+            final Path[] tablePaths) throws IOException {
+
+        final List<InputSplit> psplits = new ArrayList<>(scans.size());
+        try (org.apache.hadoop.hbase.client.Connection connection =
+                ConnectionFactory.createConnection(
+                PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            RegionLocator regionLocator =
+                    connection.getRegionLocator(TableName.valueOf(
+                            qplan.getTableRef().getTable()
+                                    .getPhysicalName().toString()));
             PhoenixInputSplit inputSplit;
 
-            HRegionLocation location = 
regionLocator.getRegionLocation(scans.get(0).getStartRow()
-                    , false);
-            long regionSize = CompatUtil.getSize(regionLocator, 
connection.getAdmin(), location);
-            String regionLocation = 
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+            HRegionLocation location = regionLocator.getRegionLocation(
+                    scans.get(0).getStartRow(),
+                    false);
+            long regionSize = CompatUtil.getSize(regionLocator,
+                    connection.getAdmin(),
+                    location);
+            String regionLocation =
+                    PhoenixStorageHandlerUtil.getRegionLocation(location,
+                    LOG);
 
             if (splitByStats) {
                 for (Scan aScan : scans) {
                     if (LOG.isDebugEnabled()) {
-                        LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
-                                .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : [" +
-                                aScan.getCaching() + ", " + 
aScan.getCacheBlocks() + ", " + aScan
-                                .getBatch() + "] and  regionLocation : " + 
regionLocation);
+                        LOG.debug("Split for  scan : "
+                                + aScan
+                                + "with scanAttribute : "
+                                + aScan.getAttributesMap()
+                                + " [scanCache, cacheBlock, scanBatch] : ["
+                                + aScan.getCaching()
+                                + ", "
+                                + aScan.getCacheBlocks()
+                                + ", "
+                                + aScan.getBatch()
+                                + "] and  regionLocation : "
+                                + regionLocation);
                     }
 
-                    inputSplit = new PhoenixInputSplit(new 
ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
-                            regionLocation, regionSize);
+                    inputSplit =
+                            new PhoenixInputSplit(
+                                    new ArrayList<>(Arrays.asList(aScan)),
+                                    tablePaths[0],
+                                    regionLocation,
+                                    regionSize);
                     inputSplit.setQuery(query);
                     psplits.add(inputSplit);
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Scan count[" + scans.size() + "] : " + 
Bytes.toStringBinary(scans
-                            .get(0).getStartRow()) + " ~ " + 
Bytes.toStringBinary(scans.get(scans
-                            .size() - 1).getStopRow()));
-                    LOG.debug("First scan : " + scans.get(0) + "with 
scanAttribute : " + scans
-                            .get(0).getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : " +
-                            "[" + scans.get(0).getCaching() + ", " + 
scans.get(0).getCacheBlocks()
-                            + ", " + scans.get(0).getBatch() + "] and  
regionLocation : " +
-                            regionLocation);
+                    LOG.debug("Scan count["

Review Comment:
   format properly.





> hive-connector will take long time to generate splits for large phoenix 
> tables.
> -------------------------------------------------------------------------------
>
>                 Key: PHOENIX-6698
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6698
>             Project: Phoenix
>          Issue Type: Improvement
>          Components: hive-connector
>    Affects Versions: 5.1.0
>            Reporter: jichen
>            Assignee: jichen
>            Priority: Minor
>             Fix For: connectors-6.0.0
>
>         Attachments: PHOENIX-6698.master.v1.patch
>
>
> {{{color:#1d1c1d}In our production environment, hive-phoenix connector  will 
> take nearly 30-40 minutes to generate splits for large phoenix table, which 
> has more than 2048 regions.it is because in class PhoenixInputFormat, 
> function  'generateSplits' only uses one thread to generate splits for each 
> scan. My proposal is to use multi-thread to generate splits in parallel. the 
> proposal has been validated in our production environment.by  changing code 
> {color}}}{color:#1d1c1d}to generate splits  in parallel with 24 threads, the 
> time cost is reduced to 2 minutes.  {color}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to