This is an automated email from the ASF dual-hosted git repository. chinmayskulkarni pushed a commit to branch 4.14-HBase-1.4 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push: new 704f938 PHOENIX-5313: All mappers grab all RegionLocations from .META 704f938 is described below commit 704f9382036b6bfd48a555cfefa479f4b4f7f6cc Author: Chinmay Kulkarni <chinmayskulka...@gmail.com> AuthorDate: Wed Jun 19 13:40:27 2019 -0700 PHOENIX-5313: All mappers grab all RegionLocations from .META --- .../org/apache/phoenix/end2end/MapReduceIT.java | 50 ++++++++++++++------ .../iterate/MapReduceParallelScanGrouper.java | 4 +- .../phoenix/mapreduce/PhoenixInputFormat.java | 41 ++++++++++++---- .../phoenix/mapreduce/PhoenixRecordReader.java | 12 +++-- .../mapreduce/util/PhoenixMapReduceUtil.java | 20 ++++++++ .../TestingMapReduceParallelScanGrouper.java | 54 ++++++++++++++++++++++ .../mapreduce/PhoenixTestingInputFormat.java | 46 ++++++++++++++++++ 7 files changed, 201 insertions(+), 26 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java index fb24bb2..2460cd2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java @@ -25,20 +25,30 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper; import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.schema.types.PDouble; import org.apache.phoenix.schema.types.PhoenixArray; import org.apache.phoenix.util.PhoenixRuntime; +import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; -import java.sql.*; +import java.sql.Array; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.Properties; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Test that our MapReduce basic tools work as expected @@ -48,28 +58,37 @@ public class MapReduceIT extends ParallelStatsDisabledIT { private static final String STOCK_NAME = "STOCK_NAME"; private static final String RECORDING_YEAR = "RECORDING_YEAR"; private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER"; - private String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + + + // We pre-split the table to ensure that we have multiple mappers. + // This is used to test scenarios with more than 1 mapper + private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " + " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR INTEGER NOT NULL, RECORDINGS_QUARTER " + - " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR ))"; + " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) " + + "SPLIT ON ('AA')"; private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS %s (v1 VARCHAR) AS " + " SELECT * FROM %s WHERE RECORDING_YEAR = 2008"; private static final String MAX_RECORDING = "MAX_RECORDING"; - private String CREATE_STOCK_STATS_TABLE = + private static final String CREATE_STOCK_STATS_TABLE = "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , " + " MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))"; - private String UPSERT = "UPSERT into %s values (?, ?, ?)"; + private static final String UPSERT = "UPSERT into %s values (?, ?, ?)"; - private String TENANT_ID = "1234567890"; + private static final String TENANT_ID = "1234567890"; @Before public void setupTables() throws Exception { } + @After + public void clearCountersForScanGrouper() { + TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries(); + } + @Test public void testNoConditionsOnSelect() throws Exception { try (Connection conn = DriverManager.getConnection(getUrl())) { @@ -93,7 +112,8 @@ public class MapReduceIT extends ParallelStatsDisabledIT { } - private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws SQLException, IOException, InterruptedException, ClassNotFoundException { + private void createAndTestJob(Connection conn, String s, double v, String tenantId) throws + SQLException, IOException, InterruptedException, ClassNotFoundException { String stockTableName = generateUniqueName(); String stockStatsTableName = generateUniqueName(); conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName)); @@ -103,10 +123,9 @@ public class MapReduceIT extends ParallelStatsDisabledIT { Job job = Job.getInstance(conf); if (tenantId != null) { setInputForTenant(job, tenantId, stockTableName, s); - } else { - PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockTableName, s, - STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, + stockTableName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); } testJob(conn, job, stockTableName, stockStatsTableName, v); @@ -120,13 +139,15 @@ public class MapReduceIT extends ParallelStatsDisabledIT { String stockViewName = generateUniqueName(); tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, stockViewName, stockTableName)); tenantConn.commit(); - PhoenixMapReduceUtil.setInput(job, StockWritable.class, stockViewName, s, - STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); + PhoenixMapReduceUtil.setInput(job, StockWritable.class, PhoenixTestingInputFormat.class, + stockViewName, s, STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER); } } private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName, double expectedMax) throws SQLException, InterruptedException, IOException, ClassNotFoundException { + assertEquals("Failed to reset getRegionBoundaries counter for scanGrouper", 0, + TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries()); upsertData(conn, stockTableName); // only run locally, rather than having to spin up a MiniMapReduce cluster and lets us use breakpoints @@ -154,6 +175,9 @@ public class MapReduceIT extends ParallelStatsDisabledIT { assertEquals("Got the wrong stock name!", "AAPL", name); assertEquals("Max value didn't match the expected!", expectedMax, max, 0); assertFalse("Should only have stored one row in stats table!", stats.next()); + assertEquals("There should have been only be 1 call to getRegionBoundaries " + + "(corresponding to the driver code)", 1, + TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries()); } /** diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java index 593608f..38e9428 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java @@ -20,6 +20,7 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; @@ -49,7 +50,8 @@ public class MapReduceParallelScanGrouper implements ParallelScanGrouper { return INSTANCE; } - private MapReduceParallelScanGrouper() {} + @VisibleForTesting + MapReduceParallelScanGrouper() {} @Override public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index 11c412c..aa94684 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; @@ -75,7 +76,7 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr final QueryPlan queryPlan = getQueryPlan(context,configuration); @SuppressWarnings("unchecked") final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration); - return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan); + return getPhoenixRecordReader(inputClass, configuration, queryPlan); } @@ -84,13 +85,13 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { final Configuration configuration = context.getConfiguration(); final QueryPlan queryPlan = getQueryPlan(context,configuration); - final List<KeyRange> allSplits = queryPlan.getSplits(); - final List<InputSplit> splits = generateSplits(queryPlan, allSplits, configuration); - return splits; + return generateSplits(queryPlan, configuration); } - private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits, Configuration config) throws IOException { - Preconditions.checkNotNull(qplan); + private List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException { + // We must call this in order to initialize the scans and splits from the query plan + setupParallelScansFromQueryPlan(qplan); + final List<KeyRange> splits = qplan.getSplits(); Preconditions.checkNotNull(splits); // Get the RegionSizeCalculator @@ -197,8 +198,6 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection(). getQueryServices().getConfiguration(), snapshotName); - // Initialize the query plan so it sets up the parallel scans - queryPlan.iterator(MapReduceParallelScanGrouper.getInstance()); return queryPlan; } } catch (Exception exception) { @@ -208,4 +207,30 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr } } + void setupParallelScansFromQueryPlan(QueryPlan queryPlan) { + setupParallelScansWithScanGrouper(queryPlan, MapReduceParallelScanGrouper.getInstance()); + } + + RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass, + Configuration configuration, QueryPlan queryPlan) { + return new PhoenixRecordReader<>(inputClass , configuration, queryPlan, + MapReduceParallelScanGrouper.getInstance()); + } + + /** + * Initialize the query plan so it sets up the parallel scans + * @param queryPlan Query plan corresponding to the select query + * @param scanGrouper Parallel scan grouper + */ + void setupParallelScansWithScanGrouper(QueryPlan queryPlan, ParallelScanGrouper scanGrouper) { + Preconditions.checkNotNull(queryPlan); + try { + queryPlan.iterator(scanGrouper); + } catch (SQLException e) { + LOGGER.error(String.format("Setting up parallel scans for the query plan failed " + + "with error [%s]", e.getMessage())); + throw new RuntimeException(e); + } + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java index b7e1373..5a4bdb4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -37,7 +37,7 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.iterate.ConcatResultIterator; import org.apache.phoenix.iterate.LookAheadResultIterator; -import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; +import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.PeekingResultIterator; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.RoundRobinResultIterator; @@ -62,18 +62,22 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class); protected final Configuration configuration; protected final QueryPlan queryPlan; + private final ParallelScanGrouper scanGrouper; private NullWritable key = NullWritable.get(); private T value = null; private Class<T> inputClass; private ResultIterator resultIterator = null; private PhoenixResultSet resultSet; - - public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) { + + PhoenixRecordReader(Class<T> inputClass, final Configuration configuration, + final QueryPlan queryPlan, final ParallelScanGrouper scanGrouper) { Preconditions.checkNotNull(configuration); Preconditions.checkNotNull(queryPlan); + Preconditions.checkNotNull(scanGrouper); this.inputClass = inputClass; this.configuration = configuration; this.queryPlan = queryPlan; + this.scanGrouper = scanGrouper; } @Override @@ -138,7 +142,7 @@ public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<Null new TableResultIterator( queryPlan.getContext().getConnection().getMutationState(), scan, scanMetricsHolder, renewScannerLeaseThreshold, queryPlan, - MapReduceParallelScanGrouper.getInstance()); + this.scanGrouper); peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java index bab6cee..e82b061 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -57,6 +57,26 @@ public final class PhoenixMapReduceUtil { /** * + * @param job MR job instance + * @param inputClass DBWritable class + * @param inputFormatClass InputFormat class + * @param tableName Input table name + * @param conditions Condition clause to be added to the WHERE clause. + * Can be <tt>null</tt> if there are no conditions. + * @param fieldNames fields being projected for the SELECT query. + */ + public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, + final Class<? extends InputFormat> inputFormatClass, final String tableName, + final String conditions, final String... fieldNames) { + final Configuration configuration = setInput(job, inputClass, inputFormatClass, tableName); + if(conditions != null) { + PhoenixConfigurationUtil.setInputTableConditions(configuration, conditions); + } + PhoenixConfigurationUtil.setSelectColumnNames(configuration, fieldNames); + } + + /** + * * @param job * @param inputClass DBWritable class * @param tableName Input table name diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java new file mode 100644 index 0000000..77b3a7e --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.phoenix.compile.StatementContext; +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * ParallelScanGrouper implementation used for testing Phoenix-MapReduce Integration + */ +public class TestingMapReduceParallelScanGrouper extends MapReduceParallelScanGrouper { + + private static final AtomicInteger numCallsToGetRegionBoundaries = new AtomicInteger(0); + private static final TestingMapReduceParallelScanGrouper INSTANCE = + new TestingMapReduceParallelScanGrouper(); + + public static TestingMapReduceParallelScanGrouper getInstance() { + return INSTANCE; + } + + @Override + public List<HRegionLocation> getRegionBoundaries(StatementContext context, + byte[] tableName) throws SQLException { + List<HRegionLocation> regionLocations = super.getRegionBoundaries(context, tableName); + numCallsToGetRegionBoundaries.incrementAndGet(); + return regionLocations; + } + + public static int getNumCallsToGetRegionBoundaries() { + return numCallsToGetRegionBoundaries.get(); + } + + public static void clearNumCallsToGetRegionBoundaries() { + numCallsToGetRegionBoundaries.set(0); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java new file mode 100644 index 0000000..c0b4bea --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper; + +/** + * InputFormat implementation used for testing Phoenix-MapReduce Integration + */ +public class PhoenixTestingInputFormat<T extends DBWritable> extends PhoenixInputFormat<T> { + + @Override + void setupParallelScansFromQueryPlan(QueryPlan queryPlan) { + setupParallelScansWithScanGrouper(queryPlan, + TestingMapReduceParallelScanGrouper.getInstance()); + } + + @Override + RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass, + Configuration configuration, QueryPlan queryPlan) { + return new PhoenixRecordReader<>(inputClass , configuration, queryPlan, + TestingMapReduceParallelScanGrouper.getInstance()); + } + +}