This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new f6f0016c43 PHOENIX-6944 Randomize mapper task ordering for Index MR tools f6f0016c43 is described below commit f6f0016c439b4df39a01622eccf5d898c1a4ad96 Author: Istvan Toth <st...@apache.org> AuthorDate: Fri Apr 28 08:16:35 2023 +0200 PHOENIX-6944 Randomize mapper task ordering for Index MR tools --- .../phoenix/mapreduce/PhoenixInputFormat.java | 129 +++++++++++++-------- .../phoenix/mapreduce/PhoenixInputSplit.java | 21 ++-- .../phoenix/mapreduce/index/IndexScrutinyTool.java | 4 + .../apache/phoenix/mapreduce/index/IndexTool.java | 4 + .../mapreduce/util/PhoenixConfigurationUtil.java | 13 +++ 5 files changed, 114 insertions(+), 57 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java index c294fede6f..f9c93e0554 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -55,6 +55,7 @@ import java.sql.Statement; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.concurrent.ThreadLocalRandom; /** * {@link InputFormat} implementation from Phoenix. @@ -87,74 +88,106 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr return generateSplits(queryPlan, configuration); } - private List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) throws IOException { + /** + * Randomise the length parameter of the splits to ensure random execution order. + * Yarn orders splits by size before execution. + * + * @param splits + */ + protected void randomizeSplitLength(List<InputSplit> splits) { + LOGGER.info("Randomizing split size"); + if (splits.size() == 0) { + return; + } + double defaultLength = 1000000d; + double totalLength = splits.stream().mapToDouble(s -> { + try { + return (double) s.getLength(); + } catch (IOException | InterruptedException e1) { + return defaultLength; + } + }).sum(); + long avgLength = (long) (totalLength / splits.size()); + splits.stream().forEach(s -> ((PhoenixInputSplit) s) + .setLength(avgLength + ThreadLocalRandom.current().nextInt(10000))); + } + + protected List<InputSplit> generateSplits(final QueryPlan qplan, Configuration config) + throws IOException { // We must call this in order to initialize the scans and splits from the query plan setupParallelScansFromQueryPlan(qplan); final List<KeyRange> splits = qplan.getSplits(); Preconditions.checkNotNull(splits); // Get the RegionSizeCalculator - try(org.apache.hadoop.hbase.client.Connection connection = - HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) { - RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan - .getTableRef().getTable().getPhysicalName().toString())); - RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection - .getAdmin()); + try (org.apache.hadoop.hbase.client.Connection connection = + HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) { + RegionLocator regionLocator = + connection.getRegionLocator(TableName + .valueOf(qplan.getTableRef().getTable().getPhysicalName().toString())); + RegionSizeCalculator sizeCalculator = + new RegionSizeCalculator(regionLocator, connection.getAdmin()); - final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); - for (List<Scan> scans : qplan.getScans()) { - // Get the region location - HRegionLocation location = regionLocator.getRegionLocation( - scans.get(0).getStartRow(), - false - ); + final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); + for (List<Scan> scans : qplan.getScans()) { + // Get the region location + HRegionLocation location = + regionLocator.getRegionLocation(scans.get(0).getStartRow(), false); - String regionLocation = location.getHostname(); + String regionLocation = location.getHostname(); - // Get the region size - long regionSize = sizeCalculator.getRegionSize( - location.getRegion().getRegionName() - ); + // Get the region size + long regionSize = + sizeCalculator.getRegionSize(location.getRegion().getRegionName()); - // Generate splits based off statistics, or just region splits? - boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config); + // Generate splits based off statistics, or just region splits? + boolean splitByStats = PhoenixConfigurationUtil.getSplitByStats(config); - if (splitByStats) { - for (Scan aScan : scans) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan - .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + - aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan - .getBatch() + "] and regionLocation : " + regionLocation); - } + if (splitByStats) { + for (Scan aScan : scans) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Split for scan : " + aScan + "with scanAttribute : " + + aScan.getAttributesMap() + + " [scanCache, cacheBlock, scanBatch] : [" + aScan.getCaching() + + ", " + aScan.getCacheBlocks() + ", " + aScan.getBatch() + + "] and regionLocation : " + regionLocation); + } - psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), regionSize, regionLocation)); - } + // The size is bogus, but it's not a problem + psplits.add(new PhoenixInputSplit(Collections.singletonList(aScan), + regionSize, regionLocation)); + } } else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans - .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans - .size() - 1).getStopRow())); - LOGGER.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans - .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " + - "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() - + ", " + scans.get(0).getBatch() + "] and regionLocation : " + - regionLocation); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Scan count[" + scans.size() + "] : " + + Bytes.toStringBinary(scans.get(0).getStartRow()) + " ~ " + + Bytes.toStringBinary(scans.get(scans.size() - 1).getStopRow())); + LOGGER.debug("First scan : " + scans.get(0) + "with scanAttribute : " + + scans.get(0).getAttributesMap() + + " [scanCache, cacheBlock, scanBatch] : " + "[" + + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks() + + ", " + scans.get(0).getBatch() + "] and regionLocation : " + + regionLocation); - for (int i = 0, limit = scans.size(); i < limit; i++) { - LOGGER.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes - .toStringBinary(scans.get(i).getAttribute - (BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); + for (int i = 0, limit = scans.size(); i < limit; i++) { + LOGGER.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + + Bytes.toStringBinary(scans.get(i).getAttribute( + BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY))); + } } + + psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation)); } + } - psplits.add(new PhoenixInputSplit(scans, regionSize, regionLocation)); + if (PhoenixConfigurationUtil.isMRRandomizeMapperExecutionOrder(config)) { + randomizeSplitLength(psplits); } + + return psplits; } - return psplits; } - } - + /** * Returns the query plan associated with the select query. * @param context diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java index 7819c5318c..a4dc1b789e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java @@ -41,7 +41,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable { private List<Scan> scans; private KeyRange keyRange; private String regionLocation = null; - private long regionSize = 0; + private long splitSize = 0; /** * No Arg constructor @@ -57,11 +57,11 @@ public class PhoenixInputSplit extends InputSplit implements Writable { this(scans, 0, null); } - public PhoenixInputSplit(final List<Scan> scans, long regionSize, String regionLocation) { + public PhoenixInputSplit(final List<Scan> scans, long splitSize, String regionLocation) { Preconditions.checkNotNull(scans); Preconditions.checkState(!scans.isEmpty()); this.scans = scans; - this.regionSize = regionSize; + this.splitSize = splitSize; this.regionLocation = regionLocation; init(); } @@ -81,7 +81,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable { @Override public void readFields(DataInput input) throws IOException { regionLocation = WritableUtils.readString(input); - regionSize = WritableUtils.readVLong(input); + splitSize = WritableUtils.readVLong(input); int count = WritableUtils.readVInt(input); scans = Lists.newArrayListWithExpectedSize(count); for (int i = 0; i < count; i++) { @@ -97,7 +97,7 @@ public class PhoenixInputSplit extends InputSplit implements Writable { @Override public void write(DataOutput output) throws IOException { WritableUtils.writeString(output, regionLocation); - WritableUtils.writeVLong(output, regionSize); + WritableUtils.writeVLong(output, splitSize); Preconditions.checkNotNull(scans); WritableUtils.writeVInt(output, scans.size()); @@ -111,15 +111,14 @@ public class PhoenixInputSplit extends InputSplit implements Writable { @Override public long getLength() throws IOException, InterruptedException { - return regionSize; + return splitSize; } @Override public String[] getLocations() throws IOException, InterruptedException { - if(regionLocation == null) { + if (regionLocation == null) { return new String[]{}; - } - else { + } else { return new String[]{regionLocation}; } } @@ -144,4 +143,8 @@ public class PhoenixInputSplit extends InputSplit implements Writable { return true; } + public void setLength(long length) { + this.splitSize = length; + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java index 1f775d6370..151fac2a54 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyTool.java @@ -264,6 +264,10 @@ public class IndexScrutinyTool extends Configured implements Tool { final PTable pdataTable = PhoenixRuntime.getTable(connection, qDataTable); final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable); + // Randomize execution order, unless explicitly set + configuration.setBooleanIfUnset( + PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + // set CURRENT_SCN for our scan so that incoming writes don't throw off scrutiny configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE, Long.toString(ts)); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 9dd384a024..bc7997c5fa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -723,6 +723,10 @@ public class IndexTool extends Configured implements Tool { Long.toString(indexRebuildRpcRetriesCounter)); configuration.set("mapreduce.task.timeout", Long.toString(indexRebuildQueryTimeoutMs)); + // Randomize execution order, unless explicitly set + configuration.setBooleanIfUnset( + PhoenixConfigurationUtil.MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, true); + PhoenixConfigurationUtil.setIndexToolDataTableName(configuration, dataTableWithSchema); PhoenixConfigurationUtil.setIndexToolIndexTableName(configuration, qIndexTable); PhoenixConfigurationUtil.setIndexToolSourceTable(configuration, sourceTable); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java index f17510e68d..1fa590829b 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -201,6 +201,13 @@ public final class PhoenixConfigurationUtil { // by default MR snapshot restore is handled internally by phoenix public static final boolean DEFAULT_MAPREDUCE_EXTERNAL_SNAPSHOT_RESTORE = false; + // Randomize mapper execution order + public static final String MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER = + "phoenix.mapreduce.randomize.mapper.execution.order"; + + // non-index jobs benefit less from this + public static final boolean DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER = false; + /** * Determines type of Phoenix Map Reduce job. * 1. QUERY allows running arbitrary queries without aggregates @@ -904,4 +911,10 @@ public final class PhoenixConfigurationUtil { return isSnapshotRestoreManagedExternally; } + public static boolean isMRRandomizeMapperExecutionOrder(final Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.getBoolean(MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER, + DEFAULT_MAPREDUCE_RANDOMIZE_MAPPER_EXECUTION_ORDER); + } + }