PHOENIX-3600 Core MapReduce classes don't provide location info This mostly just ports the same functionality in the phoenix-hive MR classes to the main classes. Adds a new configuration parameter 'phoenix.mapreduce.split.by.stats', defaulting to true, to create input splits based off the scans provided by statistics, not just the region locations.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e1b1cd87 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e1b1cd87 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e1b1cd87 Branch: refs/heads/encodecolumns2 Commit: e1b1cd8733d7adfca3a17899630c73881af187f1 Parents: 44dc576 Author: Josh Mahonin <jmaho...@gmail.com> Authored: Mon Feb 13 10:55:06 2017 -0500 Committer: Josh Mahonin <jmaho...@gmail.com> Committed: Mon Feb 13 11:04:40 2017 -0500 ---------------------------------------------------------------------- .../phoenix/mapreduce/PhoenixInputFormat.java | 69 ++++++++++++++++++-- .../phoenix/mapreduce/PhoenixInputSplit.java | 23 ++++++- .../util/PhoenixConfigurationUtil.java | 11 ++++ 3 files changed, 96 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1b1cd87/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index df96c7b..14f7b94 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -21,14 +21,18 @@ import java.io.IOException; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; import java.util.List; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.RegionSizeCalculator; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -42,6 +46,7 @@ import org.apache.phoenix.iterate.MapReduceParallelScanGrouper; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil; import org.apache.phoenix.query.KeyRange; import org.apache.phoenix.util.PhoenixRuntime; @@ -80,16 +85,72 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr final Configuration configuration = context.getConfiguration(); final QueryPlan queryPlan = getQueryPlan(context,configuration); final List<KeyRange> allSplits = queryPlan.getSplits(); - final List<InputSplit> splits = generateSplits(queryPlan,allSplits); + final List<InputSplit> splits = generateSplits(queryPlan, allSplits, configuration); return splits; } - private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException { + private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits, Configuration config) throws IOException { Preconditions.checkNotNull(qplan); Preconditions.checkNotNull(splits); + + // Get the RegionSizeCalculator + org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(config); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan + .getTableRef().getTable().getPhysicalName().toString())); + RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection + .getAdmin()); + + final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); for (List<Scan> scans : qplan.getScans()) { - psplits.add(new PhoenixInputSplit(scans)); + // Get the region location + HRegionLocation location = regionLocator.getRegionLocation( + scans.get(0).getStartRow(), + false + ); + + String regionLocation = location.getHostname(); + + // Get the region size + long regionSize = sizeCalculator.getRegionSize( + location.getRegionInfo().getRegionName() + ); + + // Generate splits based off statistics, or just region splits? + boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config); + + if(splitByStats) { + for(Scan aScan: scans) { + if (LOG.isDebugEnabled()) { + LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan + .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + + aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan + .getBatch() + "] and regionLocation : " + regionLocation); + } + + psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)); + } + } + else { + if (LOG.isDebugEnabled()) { + LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans + .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans + .size() - 1).getStopRow())); + LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans + .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " + + "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() + + ", " + scans.get(0).getBatch() + "] and regionLocation : " + + regionLocation); + + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes + .toStringBinary(scans.get(i).getAttribute + (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); + } + } + + psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation)); + } } return psplits; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1b1cd87/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java index caee3cd..6d3c5e1 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.InputSplit; @@ -41,6 +40,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable { private List<Scan> scans; private KeyRange keyRange; + private String regionLocation = null; + private long regionSize = 0; /** * No Arg constructor @@ -53,9 +54,15 @@ public class PhoenixInputSplit extends InputSplit implements Writable { * @param keyRange */ public PhoenixInputSplit(final List<Scan> scans) { + this(scans, 0, null); + } + + public PhoenixInputSplit(final List<Scan> scans, long regionSize, String regionLocation) { Preconditions.checkNotNull(scans); Preconditions.checkState(!scans.isEmpty()); this.scans = scans; + this.regionSize = regionSize; + this.regionLocation = regionLocation; init(); } @@ -73,6 +80,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable { @Override public void readFields(DataInput input) throws IOException { + regionLocation = WritableUtils.readString(input); + regionSize = WritableUtils.readVLong(input); int count = WritableUtils.readVInt(input); scans = Lists.newArrayListWithExpectedSize(count); for (int i = 0; i < count; i++) { @@ -87,6 +96,9 @@ public class PhoenixInputSplit extends InputSplit implements Writable { @Override public void write(DataOutput output) throws IOException { + WritableUtils.writeString(output, regionLocation); + WritableUtils.writeVLong(output, regionSize); + Preconditions.checkNotNull(scans); WritableUtils.writeVInt(output, scans.size()); for (Scan scan : scans) { @@ -99,12 +111,17 @@ public class PhoenixInputSplit extends InputSplit implements Writable { @Override public long getLength() throws IOException, InterruptedException { - return 0; + return regionSize; } @Override public String[] getLocations() throws IOException, InterruptedException { - return new String[]{}; + if(regionLocation == null) { + return new String[]{}; + } + else { + return new String[]{regionLocation}; + } } @Override http://git-wip-us.apache.org/repos/asf/phoenix/blob/e1b1cd87/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index f3e4450..1d2cbbe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -106,6 +106,11 @@ public final class PhoenixConfigurationUtil { public static final String DISABLED_INDEXES = "phoenix.mr.index.disabledIndexes"; + // Generate splits based on scans from stats, or just from region splits + public static final String MAPREDUCE_SPLIT_BY_STATS = "phoenix.mapreduce.split.by.stats"; + + public static final boolean DEFAULT_SPLIT_BY_STATS = true; + public enum SchemaType { TABLE, QUERY; @@ -459,4 +464,10 @@ public final class PhoenixConfigurationUtil { Preconditions.checkNotNull(configuration); return configuration.get(DISABLED_INDEXES); } + + public static boolean getSplitByStats(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, DEFAULT_SPLIT_BY_STATS); + return split; + } }