[
https://issues.apache.org/jira/browse/PHOENIX-6698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17536602#comment-17536602
]
ASF GitHub Bot commented on PHOENIX-6698:
-----------------------------------------
chrajeshbabu commented on code in PR #79:
URL: https://github.com/apache/phoenix-connectors/pull/79#discussion_r872323611
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
Review Comment:
Remove unnecessary parentheses.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
Review Comment:
Make it clear that Generate Input Splits in serial.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ ConnectionFactory.createConnection(
+ PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName.valueOf(
+ qplan.getTableRef().getTable()
+ .getPhysicalName().toString()));
PhoenixInputSplit inputSplit;
- HRegionLocation location =
regionLocator.getRegionLocation(scans.get(0).getStartRow()
- , false);
- long regionSize = CompatUtil.getSize(regionLocator,
connection.getAdmin(), location);
- String regionLocation =
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false);
+ long regionSize = CompatUtil.getSize(regionLocator,
+ connection.getAdmin(),
+ location);
+ String regionLocation =
+ PhoenixStorageHandlerUtil.getRegionLocation(location,
+ LOG);
if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Split for scan : " + aScan + "with
scanAttribute : " + aScan
- .getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : [" +
- aScan.getCaching() + ", " +
aScan.getCacheBlocks() + ", " + aScan
- .getBatch() + "] and regionLocation : " +
regionLocation);
+ LOG.debug("Split for scan : "
Review Comment:
Format the code properly.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
Review Comment:
Log message can be improved.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
Review Comment:
Reduce the number of lines used to call the method.
##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ HivePhoenixInputFormatTest.class);
+ private static final String TABLE_NAME = "HivePhoenixInputFormatTest"
+ .toUpperCase(Locale.ROOT);
+ private static final String DDL = "CREATE TABLE "
+ + TABLE_NAME
+ + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+ private static final int SPLITS = 128;
+
+ /*
+ *
+ * This test will create phoenix table with 128 splits and compare
+ * performance of split generation in serial/parallel
+ *
+ * */
+ @Test
+ public void testGetSplitsSerialOrParallel() throws
IOException,SQLException {
+ PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+ new PhoenixInputFormat<PhoenixRecordWritable>();
+ long start,end;
+
+ // create table with N splits
+ System.out.println(
+ String.format("generate testing table with %s splits",
+ String.valueOf(SPLITS)));
+ setupTestTable();
+ // setup configuration required for PhoenixInputFormat
+ Configuration conf = getUtility().getConfiguration();
+ JobConf jobConf = new JobConf(conf);
+ configureTestInput(jobConf);
+
+
+ // test get splits in serial
+ start = System.currentTimeMillis();
+ jobConf.set("hive.phoenix.split.parallel.threshold","0");
+ InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS);
+ end = System.currentTimeMillis();
+ long durationInSerial=end - start;
+ System.out.println(String.format(
Review Comment:
We need to use assertions and no use in printing those in the logs.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
Review Comment:
The connection creation can be shared and reuse when generating the
inputsplit.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ ConnectionFactory.createConnection(
+ PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName.valueOf(
Review Comment:
Region locator also can be shared for each call.
##########
phoenix-hive-base/src/test/java/org/apache/phoenix/hive/HivePhoenixInputFormatTest.java:
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hive;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
+import org.apache.phoenix.hive.constants.PhoenixStorageHandlerConstants;
+import org.apache.phoenix.hive.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.schema.TableAlreadyExistsException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * Test class for Hive PhoenixInputFormat
+ */
+@NotThreadSafe
+@Category(ParallelStatsDisabledTest.class)
+public class HivePhoenixInputFormatTest extends ParallelStatsDisabledIT {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ HivePhoenixInputFormatTest.class);
+ private static final String TABLE_NAME = "HivePhoenixInputFormatTest"
+ .toUpperCase(Locale.ROOT);
+ private static final String DDL = "CREATE TABLE "
+ + TABLE_NAME
+ + " (V1 varchar NOT NULL PRIMARY KEY, V2 integer)";
+ private static final int SPLITS = 128;
+
+ /*
+ *
+ * This test will create phoenix table with 128 splits and compare
+ * performance of split generation in serial/parallel
+ *
+ * */
+ @Test
+ public void testGetSplitsSerialOrParallel() throws
IOException,SQLException {
+ PhoenixInputFormat<PhoenixRecordWritable> inputFormat =
+ new PhoenixInputFormat<PhoenixRecordWritable>();
+ long start,end;
+
+ // create table with N splits
+ System.out.println(
+ String.format("generate testing table with %s splits",
+ String.valueOf(SPLITS)));
+ setupTestTable();
+ // setup configuration required for PhoenixInputFormat
+ Configuration conf = getUtility().getConfiguration();
+ JobConf jobConf = new JobConf(conf);
+ configureTestInput(jobConf);
+
+
+ // test get splits in serial
+ start = System.currentTimeMillis();
+ jobConf.set("hive.phoenix.split.parallel.threshold","0");
+ InputSplit[] inputSplitsSerial = inputFormat.getSplits(jobConf,SPLITS);
+ end = System.currentTimeMillis();
+ long durationInSerial=end - start;
+ System.out.println(String.format(
+ "get split in serial requires:%s ms",
+ String.valueOf(durationInSerial)));
+
+ // test get splits in parallel
+ start = System.currentTimeMillis();
+ jobConf.set("hive.phoenix.split.parallel.threshold", "1");
+ InputSplit[] inputSplitsParallel = inputFormat.getSplits(
+ jobConf,
+ SPLITS);
+ end = System.currentTimeMillis();
+ long durationInParallel=end - start;
+
+ System.out.println(String.format(
+ "get split in parallel requires:%s ms",
+ String.valueOf(durationInParallel)));
+
+ // Test if performance of parallel method is better than serial method
+ Assert.assertTrue(durationInParallel < durationInSerial);
+ // Test if the input split returned by serial method and parallel
method are the same
+ Assert.assertTrue(inputSplitsParallel.length==SPLITS);
+ Assert.assertTrue(
+ inputSplitsParallel.length == inputSplitsSerial.length
+ );
+ for (final InputSplit inputSplitParallel:inputSplitsParallel){
+ boolean match=false;
+ for (final InputSplit inputSplitSerial:inputSplitsSerial){
+ if (inputSplitParallel.equals(inputSplitSerial)){
+ match=true;
+ break;
+ }
+ }
+ Assert.assertTrue(match);
+ }
+ }
+
+ private static void setupTestTable() throws SQLException {
+ final byte [] start=new byte[0];
+ final byte [] end = Bytes.createMaxByteArray(4);
+ final byte[][] splits = Bytes.split(start, end, SPLITS-2);
+ createTestTableWithBinarySplit(getUrl(),DDL, splits ,null);
+ }
+
+ private static void buildPreparedSqlWithBinarySplits(
+ StringBuffer sb,
+ int splits)
+ {
Review Comment:
code formatting required.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
Review Comment:
Whats the difference between this parallelism level config and parallel
threshold.
##########
phoenix-hive-base/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java:
##########
@@ -121,73 +126,192 @@ public InputSplit[] getSplits(JobConf jobConf, int
numSplits) throws IOException
private List<InputSplit> generateSplits(final JobConf jobConf, final
QueryPlan qplan,
final List<KeyRange> splits,
String query) throws
IOException {
- if (qplan == null){
+ if (qplan == null) {
throw new NullPointerException();
- }if (splits == null){
+ }
+ if (splits == null) {
throw new NullPointerException();
}
final List<InputSplit> psplits = new ArrayList<>(splits.size());
- Path[] tablePaths =
FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
- .newJobContext(new Job(jobConf)));
- boolean splitByStats =
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
+ final Path[] tablePaths = FileInputFormat.getInputPaths(
+ ShimLoader.getHadoopShims().newJobContext(new Job(jobConf)));
+ final boolean splitByStats = jobConf.getBoolean(
+ PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
false);
-
+ final int parallelThreshould = jobConf.getInt(
+ "hive.phoenix.split.parallel.threshold",
+ 32);
setScanCacheSize(jobConf);
+ if (
+ (parallelThreshould <= 0)
+ ||
+ (qplan.getScans().size() < parallelThreshould)
+ ) {
+ LOG.info("generate splits in serial");
+ for (final List<Scan> scans : qplan.getScans()) {
+ psplits.addAll(
+ generateSplitsInternal(
+ jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths)
+ );
+ }
+ } else {
+ final int parallism = jobConf.getInt(
+ "hive.phoenix.split.parallel.level",
+ Runtime.getRuntime().availableProcessors() * 2);
+ ExecutorService executorService = Executors.newFixedThreadPool(
+ parallism);
+ LOG.info("generate splits in parallel with {} threads", parallism);
- // Adding Localization
- try (org.apache.hadoop.hbase.client.Connection connection =
ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf)))
{
- RegionLocator regionLocator =
connection.getRegionLocator(TableName.valueOf(qplan
- .getTableRef().getTable().getPhysicalName().toString()));
+ List<Future<List<InputSplit>>> tasks = new ArrayList<>();
- for (List<Scan> scans : qplan.getScans()) {
+ try {
+ for (final List<Scan> scans : qplan.getScans()) {
+ Future<List<InputSplit>> task = executorService.submit(
+ new Callable<List<InputSplit>>() {
+ @Override
+ public List<InputSplit> call() throws Exception {
+ return generateSplitsInternal(jobConf,
+ qplan,
+ splits,
+ query,
+ scans,
+ splitByStats,
+ tablePaths);
+ }
+ });
+ tasks.add(task);
+ }
+ for (Future<List<InputSplit>> task : tasks) {
+ psplits.addAll(task.get());
+ }
+ } catch (ExecutionException | InterruptedException exception) {
+ throw new IOException("failed to get splits,reason:",
+ exception);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+ return psplits;
+ }
+ /**
+ * This method is used to generate splits for each scan list.
+ * @param jobConf MapReduce Job Configuration
+ * @param qplan phoenix query plan
+ * @param splits phoenix table splits
+ * @param query phoenix query statement
+ * @param scans scan list slice of query plan
+ * @param splitByStats split by stat enabled
+ * @param tablePaths table paths
+ * @return List of Input Splits
+ * @throws IOException if function fails
+ */
+ private List<InputSplit> generateSplitsInternal(final JobConf jobConf,
+ final QueryPlan qplan,
+ final List<KeyRange> splits,
+ final String query,
+ final List<Scan> scans,
+ final boolean splitByStats,
+ final Path[] tablePaths) throws IOException {
+
+ final List<InputSplit> psplits = new ArrayList<>(scans.size());
+ try (org.apache.hadoop.hbase.client.Connection connection =
+ ConnectionFactory.createConnection(
+ PhoenixConnectionUtil.getConfiguration(jobConf))) {
+ RegionLocator regionLocator =
+ connection.getRegionLocator(TableName.valueOf(
+ qplan.getTableRef().getTable()
+ .getPhysicalName().toString()));
PhoenixInputSplit inputSplit;
- HRegionLocation location =
regionLocator.getRegionLocation(scans.get(0).getStartRow()
- , false);
- long regionSize = CompatUtil.getSize(regionLocator,
connection.getAdmin(), location);
- String regionLocation =
PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
+ HRegionLocation location = regionLocator.getRegionLocation(
+ scans.get(0).getStartRow(),
+ false);
+ long regionSize = CompatUtil.getSize(regionLocator,
+ connection.getAdmin(),
+ location);
+ String regionLocation =
+ PhoenixStorageHandlerUtil.getRegionLocation(location,
+ LOG);
if (splitByStats) {
for (Scan aScan : scans) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Split for scan : " + aScan + "with
scanAttribute : " + aScan
- .getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : [" +
- aScan.getCaching() + ", " +
aScan.getCacheBlocks() + ", " + aScan
- .getBatch() + "] and regionLocation : " +
regionLocation);
+ LOG.debug("Split for scan : "
+ + aScan
+ + "with scanAttribute : "
+ + aScan.getAttributesMap()
+ + " [scanCache, cacheBlock, scanBatch] : ["
+ + aScan.getCaching()
+ + ", "
+ + aScan.getCacheBlocks()
+ + ", "
+ + aScan.getBatch()
+ + "] and regionLocation : "
+ + regionLocation);
}
- inputSplit = new PhoenixInputSplit(new
ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
- regionLocation, regionSize);
+ inputSplit =
+ new PhoenixInputSplit(
+ new ArrayList<>(Arrays.asList(aScan)),
+ tablePaths[0],
+ regionLocation,
+ regionSize);
inputSplit.setQuery(query);
psplits.add(inputSplit);
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug("Scan count[" + scans.size() + "] : " +
Bytes.toStringBinary(scans
- .get(0).getStartRow()) + " ~ " +
Bytes.toStringBinary(scans.get(scans
- .size() - 1).getStopRow()));
- LOG.debug("First scan : " + scans.get(0) + "with
scanAttribute : " + scans
- .get(0).getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : " +
- "[" + scans.get(0).getCaching() + ", " +
scans.get(0).getCacheBlocks()
- + ", " + scans.get(0).getBatch() + "] and
regionLocation : " +
- regionLocation);
+ LOG.debug("Scan count["
Review Comment:
format properly.
> hive-connector will take long time to generate splits for large phoenix
> tables.
> -------------------------------------------------------------------------------
>
> Key: PHOENIX-6698
> URL: https://issues.apache.org/jira/browse/PHOENIX-6698
> Project: Phoenix
> Issue Type: Improvement
> Components: hive-connector
> Affects Versions: 5.1.0
> Reporter: jichen
> Assignee: jichen
> Priority: Minor
> Fix For: connectors-6.0.0
>
> Attachments: PHOENIX-6698.master.v1.patch
>
>
> {{{color:#1d1c1d}In our production environment, hive-phoenix connector will
> take nearly 30-40 minutes to generate splits for large phoenix table, which
> has more than 2048 regions.it is because in class PhoenixInputFormat,
> function 'generateSplits' only uses one thread to generate splits for each
> scan. My proposal is to use multi-thread to generate splits in parallel. the
> proposal has been validated in our production environment.by changing code
> {color}}}{color:#1d1c1d}to generate splits in parallel with 24 threads, the
> time cost is reduced to 2 minutes. {color}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)