[ 
https://issues.apache.org/jira/browse/PHOENIX-6698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17542622#comment-17542622
 ] 

ASF GitHub Bot commented on PHOENIX-6698:
-----------------------------------------

joshelser commented on code in PR #79:
URL: https://github.com/apache/phoenix-connectors/pull/79#discussion_r882833463


##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     }
 
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
-                                            final List<KeyRange> splits, 
String query) throws
-            IOException {
-        if (qplan == null){
+                                            final List<KeyRange> splits, final 
String query)
+            throws IOException {
+
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshold = jobConf.getInt(
+                
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+                
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
         setScanCacheSize(jobConf);
+        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory
+                
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            final RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(
+                            
qplan.getTableRef().getTable().getPhysicalName().toString()));
+            final int scanSize = qplan.getScans().size();
+            if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+                final int parallism = jobConf.getInt(

Review Comment:
   ```suggestion
                   final int parallelism = jobConf.getInt(
   ```



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     }
 
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
-                                            final List<KeyRange> splits, 
String query) throws
-            IOException {
-        if (qplan == null){
+                                            final List<KeyRange> splits, final 
String query)
+            throws IOException {
+
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshold = jobConf.getInt(
+                
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+                
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
         setScanCacheSize(jobConf);
+        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory
+                
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            final RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(
+                            
qplan.getTableRef().getTable().getPhysicalName().toString()));
+            final int scanSize = qplan.getScans().size();
+            if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+                final int parallism = jobConf.getInt(
+                        
PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
+                        PhoenixStorageHandlerConstants
+                                
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
+                ExecutorService executorService = 
Executors.newFixedThreadPool(parallism);
+                LOG.info("Generate Input Splits in Parallel with {} threads", 
parallism);
 
-        // Adding Localization
-        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
 {
-        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
-                .getTableRef().getTable().getPhysicalName().toString()));
-
-        for (List<Scan> scans : qplan.getScans()) {
-            PhoenixInputSplit inputSplit;
-
-            HRegionLocation location = 
regionLocator.getRegionLocation(scans.get(0).getStartRow()
-                    , false);
-            long regionSize = CompatUtil.getSize(regionLocator, 
connection.getAdmin(), location);
-            String regionLocation = 
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
-
-            if (splitByStats) {
-                for (Scan aScan : scans) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
-                                .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : [" +
-                                aScan.getCaching() + ", " + 
aScan.getCacheBlocks() + ", " + aScan
-                                .getBatch() + "] and  regionLocation : " + 
regionLocation);
-                    }
+                List<Future<List<InputSplit>>> tasks = new ArrayList<>();
 
-                    inputSplit = new PhoenixInputSplit(new 
ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
-                            regionLocation, regionSize);
-                    inputSplit.setQuery(query);
-                    psplits.add(inputSplit);
+                try {
+                    for (final List<Scan> scans : qplan.getScans()) {
+                        Future<List<InputSplit>> task = executorService.submit(
+                                new Callable<List<InputSplit>>() {
+                                    @Override public List<InputSplit> call() 
throws Exception {
+                                        return generateSplitsInternal(query, 
scans, splitByStats,
+                                                connection, regionLocator, 
tablePaths);
+                                    }
+                                });
+                        tasks.add(task);
+                    }
+                    for (Future<List<InputSplit>> task : tasks) {
+                        psplits.addAll(task.get());
+                    }
+                } catch (ExecutionException | InterruptedException exception) {
+                    throw new IOException("Failed to Generate Input Splits in 
Parallel, reason:",
+                            exception);

Review Comment:
   Good to unwrap the ExecutionException and throw back the real exception. It 
may already be an IOException which you can throw with a cast, rather than 
rewrapping in another IOException.



##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -119,74 +124,144 @@ public InputSplit[] getSplits(JobConf jobConf, int 
numSplits) throws IOException
     }
 
     private List<InputSplit> generateSplits(final JobConf jobConf, final 
QueryPlan qplan,
-                                            final List<KeyRange> splits, 
String query) throws
-            IOException {
-        if (qplan == null){
+                                            final List<KeyRange> splits, final 
String query)
+            throws IOException {
+
+        if (qplan == null) {
             throw new NullPointerException();
-        }if (splits == null){
+        }
+        if (splits == null) {
             throw new NullPointerException();
         }
         final List<InputSplit> psplits = new ArrayList<>(splits.size());
 
-        Path[] tablePaths = 
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
-                .newJobContext(new Job(jobConf)));
-        boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+        final Path[] tablePaths = FileInputFormat.getInputPaths(
+                ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+        final boolean splitByStats = jobConf.getBoolean(
+                PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-
+        final int parallelThreshold = jobConf.getInt(
+                
PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
+                
PhoenixStorageHandlerConstants.DEFAULT_PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD);
         setScanCacheSize(jobConf);
+        try (org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory
+                
.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
+            final RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(
+                            
qplan.getTableRef().getTable().getPhysicalName().toString()));
+            final int scanSize = qplan.getScans().size();
+            if (useParallelInputGeneration(parallelThreshold, scanSize)) {
+                final int parallism = jobConf.getInt(
+                        
PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,
+                        PhoenixStorageHandlerConstants
+                                
.DEFAULT_PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT);
+                ExecutorService executorService = 
Executors.newFixedThreadPool(parallism);
+                LOG.info("Generate Input Splits in Parallel with {} threads", 
parallism);

Review Comment:
   ```suggestion
                   LOG.info("Generating Input Splits in Parallel with {} 
threads", parallism);
   ```



##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+    private static final Logger LOG = 
LoggerFactory.getLogger(HivePhoenixInputFormatTest.class);
+    private static final String TABLE_NAME = 
"HivePhoenixInputFormatTest".toUpperCase(Locale.ROOT);
+    private static final String DDL = "CREATE TABLE " + TABLE_NAME
+            + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+    private static final int SPLITS = 256;
+
+    // This test will create phoenix table with 128 splits and compare 
performance of
+    // serial split-generation method and parallel split-generation method.
+    @Test
+    public void testGetSplitsSerialOrParallel() throws IOException, 
SQLException {
+        PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+                new PhoenixInputFormat<PhoenixRecordWritable>();
+        long start;
+        long end;
+        // create table with N splits
+        System.out.println(
+                String.format("generate testing table with %s splits", 
String.valueOf(SPLITS)));
+        setupTestTable();
+        // setup configuration required for PhoenixInputFormat
+        Configuration conf = getUtility().getConfiguration();
+        JobConf jobConf = new JobConf(conf);
+        configureTestInput(jobConf);
+        inputFormat.getSplits(jobConf, SPLITS);
+        InputSplit[] inputSplitsSerial;
+        // test get splits in serial
+        start = System.currentTimeMillis();
+        
jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
 "0");
+        inputSplitsSerial = inputFormat.getSplits(jobConf, SPLITS);
+        end = System.currentTimeMillis();
+        long durationInSerial = end - start;
+        System.out.println(String.format("get split in serial requires:%s ms",
+                String.valueOf(durationInSerial)));
+
+        // test get splits in parallel
+        start = System.currentTimeMillis();
+        
jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_MINIMUM_PARALLEL_SCANS_THRESHOLD,
 "1");
+        
jobConf.set(PhoenixStorageHandlerConstants.PHOENIX_INPUTSPLIT_GENERATION_THREAD_COUNT,"24");
+        InputSplit[] inputSplitsParallel = inputFormat.getSplits(jobConf, 
SPLITS);
+        end = System.currentTimeMillis();
+        long durationInParallel = end - start;
+
+        System.out.println(String.format("get split in parallel requires:%s 
ms",
+                String.valueOf(durationInParallel)));
+
+        // Test if performance of parallel method is better than serial method
+        Assert.assertTrue(durationInParallel < durationInSerial);

Review Comment:
   This will result in flaky tests as the environments which will run this test 
are guaranteed to not be deterministic. Unit tests should be about functional 
correctness, not performance.



##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {

Review Comment:
   Do the existing Phoenix-hive tests activate your new property and implicitly 
validate that it is functional? I think we have some test classes but do we 
create multi-region Phoenix tables in those tests (or those with enough data to 
have multiple guideposts)?





> hive-connector will take long time to generate splits for large phoenix 
> tables.
> -------------------------------------------------------------------------------
>
>                 Key: PHOENIX-6698
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6698
>             Project: Phoenix
>          Issue Type: Improvement
>          Components: hive-connector
>    Affects Versions: 5.1.0
>            Reporter: jichen
>            Assignee: jichen
>            Priority: Minor
>             Fix For: connectors-6.0.0
>
>         Attachments: PHOENIX-6698.master.v1.patch
>
>
> {{{color:#1d1c1d}In our production environment, hive-phoenix connector  will 
> take nearly 30-40 minutes to generate splits for large phoenix table, which 
> has more than 2048 regions.it is because in class PhoenixInputFormat, 
> function  'generateSplits' only uses one thread to generate splits for each 
> scan. My proposal is to use multi-thread to generate splits in parallel. the 
> proposal has been validated in our production environment.by  changing code 
> {color}}}{color:#1d1c1d}to generate splits  in parallel with 24 threads, the 
> time cost is reduced to 2 minutes.  {color}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to