This is an automated email from the ASF dual-hosted git repository.

chinmayskulkarni pushed a commit to branch 4.14-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.4 by this push:
     new 704f938  PHOENIX-5313: All mappers grab all RegionLocations from .META
704f938 is described below

commit 704f9382036b6bfd48a555cfefa479f4b4f7f6cc
Author: Chinmay Kulkarni <chinmayskulka...@gmail.com>
AuthorDate: Wed Jun 19 13:40:27 2019 -0700

    PHOENIX-5313: All mappers grab all RegionLocations from .META
---
 .../org/apache/phoenix/end2end/MapReduceIT.java    | 50 ++++++++++++++------
 .../iterate/MapReduceParallelScanGrouper.java      |  4 +-
 .../phoenix/mapreduce/PhoenixInputFormat.java      | 41 ++++++++++++----
 .../phoenix/mapreduce/PhoenixRecordReader.java     | 12 +++--
 .../mapreduce/util/PhoenixMapReduceUtil.java       | 20 ++++++++
 .../TestingMapReduceParallelScanGrouper.java       | 54 ++++++++++++++++++++++
 .../mapreduce/PhoenixTestingInputFormat.java       | 46 ++++++++++++++++++
 7 files changed, 201 insertions(+), 26 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
index fb24bb2..2460cd2 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MapReduceIT.java
@@ -25,20 +25,30 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
 import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
+import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.schema.types.PDouble;
 import org.apache.phoenix.schema.types.PhoenixArray;
 import org.apache.phoenix.util.PhoenixRuntime;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.sql.*;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.util.Properties;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test that our MapReduce basic tools work as expected
@@ -48,28 +58,37 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
     private static final String STOCK_NAME = "STOCK_NAME";
     private static final String RECORDING_YEAR = "RECORDING_YEAR";
     private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
-    private  String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( " +
+
+    // We pre-split the table to ensure that we have multiple mappers.
+    // This is used to test scenarios with more than 1 mapper
+    private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT 
EXISTS %s ( " +
             " STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL, 
 RECORDINGS_QUARTER " +
-            " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, 
RECORDING_YEAR ))";
+            " DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, 
RECORDING_YEAR )) "
+            + "SPLIT ON ('AA')";
 
     private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS 
%s (v1 VARCHAR) AS "
         + " SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
 
     private static final String MAX_RECORDING = "MAX_RECORDING";
-    private  String CREATE_STOCK_STATS_TABLE =
+    private static final String CREATE_STOCK_STATS_TABLE =
             "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , "
                     + " MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY 
(STOCK_NAME ))";
 
 
-    private String UPSERT = "UPSERT into %s values (?, ?, ?)";
+    private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
 
-    private String TENANT_ID = "1234567890";
+    private static final String TENANT_ID = "1234567890";
 
     @Before
     public void setupTables() throws Exception {
 
     }
 
+    @After
+    public void clearCountersForScanGrouper() {
+        
TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
+    }
+
     @Test
     public void testNoConditionsOnSelect() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl())) {
@@ -93,7 +112,8 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
 
     }
 
-    private void createAndTestJob(Connection conn, String s, double v, String 
tenantId) throws SQLException, IOException, InterruptedException, 
ClassNotFoundException {
+    private void createAndTestJob(Connection conn, String s, double v, String 
tenantId) throws
+            SQLException, IOException, InterruptedException, 
ClassNotFoundException {
         String stockTableName = generateUniqueName();
         String stockStatsTableName = generateUniqueName();
         conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, 
stockTableName));
@@ -103,10 +123,9 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         Job job = Job.getInstance(conf);
         if (tenantId != null) {
             setInputForTenant(job, tenantId, stockTableName, s);
-
         } else {
-            PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
stockTableName, s,
-                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
PhoenixTestingInputFormat.class,
+                    stockTableName, s, STOCK_NAME, RECORDING_YEAR, "0." + 
RECORDINGS_QUARTER);
         }
         testJob(conn, job, stockTableName, stockStatsTableName, v);
 
@@ -120,13 +139,15 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
             String stockViewName = generateUniqueName();
             
tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, 
stockViewName, stockTableName));
             tenantConn.commit();
-            PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
stockViewName, s,
-                STOCK_NAME, RECORDING_YEAR, "0." + RECORDINGS_QUARTER);
+            PhoenixMapReduceUtil.setInput(job, StockWritable.class, 
PhoenixTestingInputFormat.class,
+                    stockViewName, s, STOCK_NAME, RECORDING_YEAR, "0." + 
RECORDINGS_QUARTER);
         }
     }
 
     private void testJob(Connection conn, Job job, String stockTableName, 
String stockStatsTableName, double expectedMax)
             throws SQLException, InterruptedException, IOException, 
ClassNotFoundException {
+        assertEquals("Failed to reset getRegionBoundaries counter for 
scanGrouper", 0,
+                
TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
         upsertData(conn, stockTableName);
 
         // only run locally, rather than having to spin up a MiniMapReduce 
cluster and lets us use breakpoints
@@ -154,6 +175,9 @@ public class MapReduceIT extends ParallelStatsDisabledIT {
         assertEquals("Got the wrong stock name!", "AAPL", name);
         assertEquals("Max value didn't match the expected!", expectedMax, max, 
0);
         assertFalse("Should only have stored one row in stats table!", 
stats.next());
+        assertEquals("There should have been only be 1 call to 
getRegionBoundaries "
+                        + "(corresponding to the driver code)", 1,
+                
TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
     }
 
     /**
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
index 593608f..38e9428 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MapReduceParallelScanGrouper.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.List;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
@@ -49,7 +50,8 @@ public class MapReduceParallelScanGrouper implements 
ParallelScanGrouper {
                return INSTANCE;
        }
 
-   private MapReduceParallelScanGrouper() {}
+    @VisibleForTesting
+    MapReduceParallelScanGrouper() {}
 
        @Override
        public boolean shouldStartNewScan(QueryPlan plan, List<Scan> scans,
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 11c412c..aa94684 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
@@ -75,7 +76,7 @@ public class PhoenixInputFormat<T extends DBWritable> extends 
InputFormat<NullWr
         final QueryPlan queryPlan = getQueryPlan(context,configuration);
         @SuppressWarnings("unchecked")
         final Class<T> inputClass = (Class<T>) 
PhoenixConfigurationUtil.getInputClass(configuration);
-        return new PhoenixRecordReader<T>(inputClass , configuration, 
queryPlan);
+        return getPhoenixRecordReader(inputClass, configuration, queryPlan);
     }
     
    
@@ -84,13 +85,13 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
     public List<InputSplit> getSplits(JobContext context) throws IOException, 
InterruptedException {  
         final Configuration configuration = context.getConfiguration();
         final QueryPlan queryPlan = getQueryPlan(context,configuration);
-        final List<KeyRange> allSplits = queryPlan.getSplits();
-        final List<InputSplit> splits = generateSplits(queryPlan, allSplits, 
configuration);
-        return splits;
+        return generateSplits(queryPlan, configuration);
     }
 
-    private List<InputSplit> generateSplits(final QueryPlan qplan, final 
List<KeyRange> splits, Configuration config) throws IOException {
-        Preconditions.checkNotNull(qplan);
+    private List<InputSplit> generateSplits(final QueryPlan qplan, 
Configuration config) throws IOException {
+        // We must call this in order to initialize the scans and splits from 
the query plan
+        setupParallelScansFromQueryPlan(qplan);
+        final List<KeyRange> splits = qplan.getSplits();
         Preconditions.checkNotNull(splits);
 
         // Get the RegionSizeCalculator
@@ -197,8 +198,6 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
                 
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
                     getQueryServices().getConfiguration(), snapshotName);
 
-              // Initialize the query plan so it sets up the parallel scans
-              queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
               return queryPlan;
             }
         } catch (Exception exception) {
@@ -208,4 +207,30 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
         }
     }
 
+    void setupParallelScansFromQueryPlan(QueryPlan queryPlan) {
+        setupParallelScansWithScanGrouper(queryPlan, 
MapReduceParallelScanGrouper.getInstance());
+    }
+
+    RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass,
+            Configuration configuration, QueryPlan queryPlan) {
+        return new PhoenixRecordReader<>(inputClass , configuration, queryPlan,
+                MapReduceParallelScanGrouper.getInstance());
+    }
+
+    /**
+     * Initialize the query plan so it sets up the parallel scans
+     * @param queryPlan Query plan corresponding to the select query
+     * @param scanGrouper Parallel scan grouper
+     */
+    void setupParallelScansWithScanGrouper(QueryPlan queryPlan, 
ParallelScanGrouper scanGrouper) {
+        Preconditions.checkNotNull(queryPlan);
+        try {
+            queryPlan.iterator(scanGrouper);
+        } catch (SQLException e) {
+            LOGGER.error(String.format("Setting up parallel scans for the 
query plan failed "
+                    + "with error [%s]", e.getMessage()));
+            throw new RuntimeException(e);
+        }
+    }
+
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index b7e1373..5a4bdb4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -37,7 +37,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
-import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
+import org.apache.phoenix.iterate.ParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -62,18 +62,22 @@ public class PhoenixRecordReader<T extends DBWritable> 
extends RecordReader<Null
     private static final Log LOG = 
LogFactory.getLog(PhoenixRecordReader.class);
     protected final Configuration  configuration;
     protected final QueryPlan queryPlan;
+    private final ParallelScanGrouper scanGrouper;
     private NullWritable key =  NullWritable.get();
     private T value = null;
     private Class<T> inputClass;
     private ResultIterator resultIterator = null;
     private PhoenixResultSet resultSet;
-    
-    public PhoenixRecordReader(Class<T> inputClass,final Configuration 
configuration,final QueryPlan queryPlan) {
+
+    PhoenixRecordReader(Class<T> inputClass, final Configuration configuration,
+            final QueryPlan queryPlan, final ParallelScanGrouper scanGrouper) {
         Preconditions.checkNotNull(configuration);
         Preconditions.checkNotNull(queryPlan);
+        Preconditions.checkNotNull(scanGrouper);
         this.inputClass = inputClass;
         this.configuration = configuration;
         this.queryPlan = queryPlan;
+        this.scanGrouper = scanGrouper;
     }
 
     @Override
@@ -138,7 +142,7 @@ public class PhoenixRecordReader<T extends DBWritable> 
extends RecordReader<Null
                       new TableResultIterator(
                           
queryPlan.getContext().getConnection().getMutationState(), scan,
                           scanMetricsHolder, renewScannerLeaseThreshold, 
queryPlan,
-                          MapReduceParallelScanGrouper.getInstance());
+                          this.scanGrouper);
                   peekingResultIterator = 
LookAheadResultIterator.wrap(tableResultIterator);
                 }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
index bab6cee..e82b061 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
@@ -57,6 +57,26 @@ public final class PhoenixMapReduceUtil {
 
     /**
      *
+     * @param job MR job instance
+     * @param inputClass DBWritable class
+     * @param inputFormatClass  InputFormat class
+     * @param tableName  Input table name
+     * @param conditions Condition clause to be added to the WHERE clause.
+     *                   Can be <tt>null</tt> if there are no conditions.
+     * @param fieldNames fields being projected for the SELECT query.
+     */
+    public static void setInput(final Job job, final Class<? extends 
DBWritable> inputClass,
+            final Class<? extends InputFormat> inputFormatClass, final String 
tableName,
+            final String conditions, final String... fieldNames) {
+        final Configuration configuration = setInput(job, inputClass, 
inputFormatClass, tableName);
+        if(conditions != null) {
+            PhoenixConfigurationUtil.setInputTableConditions(configuration, 
conditions);
+        }
+        PhoenixConfigurationUtil.setSelectColumnNames(configuration, 
fieldNames);
+    }
+
+    /**
+     *
      * @param job
      * @param inputClass  DBWritable class
      * @param tableName   Input table name
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
new file mode 100644
index 0000000..77b3a7e
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/TestingMapReduceParallelScanGrouper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.phoenix.compile.StatementContext;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * ParallelScanGrouper implementation used for testing Phoenix-MapReduce 
Integration
+ */
+public class TestingMapReduceParallelScanGrouper extends 
MapReduceParallelScanGrouper {
+
+    private static final AtomicInteger numCallsToGetRegionBoundaries = new 
AtomicInteger(0);
+    private static final TestingMapReduceParallelScanGrouper INSTANCE =
+            new TestingMapReduceParallelScanGrouper();
+
+    public static TestingMapReduceParallelScanGrouper getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public List<HRegionLocation> getRegionBoundaries(StatementContext context,
+            byte[] tableName) throws SQLException {
+        List<HRegionLocation> regionLocations = 
super.getRegionBoundaries(context, tableName);
+        numCallsToGetRegionBoundaries.incrementAndGet();
+        return regionLocations;
+    }
+
+    public static int getNumCallsToGetRegionBoundaries() {
+        return numCallsToGetRegionBoundaries.get();
+    }
+
+    public static void clearNumCallsToGetRegionBoundaries() {
+        numCallsToGetRegionBoundaries.set(0);
+    }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java
new file mode 100644
index 0000000..c0b4bea
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixTestingInputFormat.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
+
+/**
+ * InputFormat implementation used for testing Phoenix-MapReduce Integration
+ */
+public class PhoenixTestingInputFormat<T extends DBWritable> extends 
PhoenixInputFormat<T> {
+
+    @Override
+    void setupParallelScansFromQueryPlan(QueryPlan queryPlan) {
+        setupParallelScansWithScanGrouper(queryPlan,
+                TestingMapReduceParallelScanGrouper.getInstance());
+    }
+
+    @Override
+    RecordReader<NullWritable,T> getPhoenixRecordReader(Class<T> inputClass,
+            Configuration configuration, QueryPlan queryPlan) {
+        return new PhoenixRecordReader<>(inputClass , configuration, queryPlan,
+                TestingMapReduceParallelScanGrouper.getInstance());
+    }
+
+}

Reply via email to