This is an automated email from the ASF dual-hosted git repository. vjasani pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 8f1508d2a8 PHOENIX-6907 (ADDENDUM) Explain Plan should output region locations with servers (#1650) 8f1508d2a8 is described below commit 8f1508d2a8ee78bafba6b7b0f1fe5ce27e66f867 Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Mon Jul 31 21:24:45 2023 -0700 PHOENIX-6907 (ADDENDUM) Explain Plan should output region locations with servers (#1650) --- .../phoenix/iterate/BaseResultIterators.java | 31 ++++--- .../org/apache/phoenix/iterate/ExplainTable.java | 94 +++++----------------- .../phoenix/iterate/ParallelScansCollector.java | 12 ++- .../phoenix/iterate/ScansWithRegionLocations.java | 47 +++++++++++ 4 files changed, 94 insertions(+), 90 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index ae4da8a9a5..1157cec81c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -154,6 +154,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result private static final int ESTIMATED_GUIDEPOSTS_PER_REGION = 20; private static final int MIN_SEEK_TO_COLUMN_VERSION = VersionUtil.encodeVersion("0", "98", "12"); private final List<List<Scan>> scans; + private final List<HRegionLocation> regionLocations; private final List<KeyRange> splits; private final byte[] physicalTableName; protected final QueryPlan plan; @@ -550,7 +551,9 @@ public abstract class BaseResultIterators extends ExplainTable implements Result initializeScan(plan, perScanLimit, offset, scan); this.useStatsForParallelization = PhoenixConfigurationUtil.getStatsForParallelizationProp(context.getConnection(), table); - this.scans = getParallelScans(); + ScansWithRegionLocations scansWithRegionLocations = getParallelScans(); + this.scans = scansWithRegionLocations.getScans(); + this.regionLocations = scansWithRegionLocations.getRegionLocations(); List<KeyRange> splitRanges = Lists.newArrayListWithExpectedSize(scans.size() * ESTIMATED_GUIDEPOSTS_PER_REGION); for (List<Scan> scanList : scans) { for (Scan aScan : scanList) { @@ -656,7 +659,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result gps.getGuidePostTimestamps()[guideIndex]); } - private List<List<Scan>> getParallelScans() throws SQLException { + private ScansWithRegionLocations getParallelScans() throws SQLException { // If the scan boundaries are not matching with scan in context that means we need to get // parallel scans for the chunk after split/merge. if (!ScanUtil.isContextScan(scan, context)) { @@ -673,7 +676,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result * @return * @throws SQLException */ - private List<List<Scan>> getParallelScans(Scan scan) throws SQLException { + private ScansWithRegionLocations getParallelScans(Scan scan) throws SQLException { List<HRegionLocation> regionLocations = getRegionBoundaries(scanGrouper); List<byte[]> regionBoundaries = toBoundaries(regionLocations); int regionIndex = 0; @@ -707,10 +710,11 @@ public abstract class BaseResultIterators extends ExplainTable implements Result newScan.setAttribute(BaseScannerRegionObserver.SCAN_REGION_SERVER, regionLocation.getServerName().getVersionedBytes()); } - parallelScans.addNewScan(plan, newScan, true); + parallelScans.addNewScan(plan, newScan, true, regionLocation); regionIndex++; } - return parallelScans.getParallelScans(); + return new ScansWithRegionLocations(parallelScans.getParallelScans(), + parallelScans.getRegionLocations()); } private static class GuidePostEstimate { @@ -907,7 +911,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result * @return list of parallel scans to run for a given query. * @throws SQLException */ - private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) throws SQLException { + private ScansWithRegionLocations getParallelScans(byte[] startKey, byte[] stopKey) + throws SQLException { ScanRanges scanRanges = context.getScanRanges(); PTable table = getTable(); boolean isLocalIndex = table.getIndexType() == IndexType.LOCAL; @@ -920,7 +925,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result generateEstimates(scanRanges, table, GuidePostsInfo.NO_GUIDEPOST, GuidePostsInfo.NO_GUIDEPOST.isEmptyGuidePost(), parallelScans, estimates, Long.MAX_VALUE, false); - return parallelScans; + // we don't retrieve region location for the given scan range + return new ScansWithRegionLocations(parallelScans, null); } byte[] sampleProcessedSaltByte = SchemaUtil.processSplit(new byte[] { 0 }, table.getPKColumns()); @@ -1098,7 +1104,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } boolean lastOfNew = newScanIdx == newScans.size() - 1; parallelScanCollector.addNewScan(plan, newScan, - gpsComparedToEndKey == 0 && lastOfNew); + gpsComparedToEndKey == 0 && lastOfNew, regionLocation); } } if (newScans.size() > 0) { @@ -1146,7 +1152,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result regionLocation.getServerName().getVersionedBytes()); } boolean lastOfNew = newScanIdx == newScans.size() - 1; - parallelScanCollector.addNewScan(plan, newScan, lastOfNew); + parallelScanCollector.addNewScan(plan, newScan, lastOfNew, regionLocation); } if (newScans.size() > 0) { // Boundary case of no GP in region after delaying adding of estimates @@ -1184,7 +1190,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result if (stream != null) Closeables.closeQuietly(stream); } sampleScans(parallelScanCollector.getParallelScans(),this.plan.getStatement().getTableSamplingRate()); - return parallelScanCollector.getParallelScans(); + return new ScansWithRegionLocations(parallelScanCollector.getParallelScans(), + parallelScanCollector.getRegionLocations()); } private void generateEstimates(ScanRanges scanRanges, PTable table, GuidePostsInfo gps, @@ -1489,7 +1496,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); byte[] endKey = oldScan.getStopRow(); - List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey); + List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey).getScans(); // Add any concatIterators that were successful so far // as we need these to be in order addIterator(iterators, concatIterators); @@ -1688,7 +1695,7 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } } - explain(buf.toString(), planSteps, explainPlanAttributesBuilder, scans); + explain(buf.toString(), planSteps, explainPlanAttributesBuilder, regionLocations); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java index 565186b58c..20f5d40ecd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ExplainTable.java @@ -17,8 +17,6 @@ */ package org.apache.phoenix.iterate; -import java.io.IOException; -import java.sql.SQLException; import java.text.Format; import java.util.ArrayList; import java.util.Arrays; @@ -29,11 +27,9 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.filter.PageFilter; @@ -126,65 +122,10 @@ public abstract class ExplainTable { return buf.toString(); } - /** - * Get regions that represent the given range of start and end key for the given table, and - * all the regions to the regionLocations list. - * - * @param tableName the table name. - * @param startKey the start rowkey. - * @param endKey the end rowkey. - * @param includeEndKey true if end key needs to be included. - * @param reload true if reload from meta is necessary. - * @param regionBoundaries set of region boundaries to get the unique list of region locations. - * @param regionLocations the list of region locations as output. - * @throws IOException if something goes wrong while creating connection or querying region - * locations. - */ - private void getRegionsInRange(final byte[] tableName, - final byte[] startKey, - final byte[] endKey, - final boolean includeEndKey, - final boolean reload, - Set<RegionBoundary> regionBoundaries, - List<HRegionLocation> regionLocations) - throws IOException, SQLException { - final boolean endKeyIsEndOfTable = Bytes.equals(endKey, HConstants.EMPTY_END_ROW); - if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) { - throw new IllegalArgumentException( - "Invalid range: " + Bytes.toStringBinary(startKey) + " > " + - Bytes.toStringBinary(endKey)); - } - byte[] currentKey = startKey; - try (Table table = context.getConnection().getQueryServices().getTable(tableName)) { - // include all regions that include key range from the given start key - // and end key - do { - HRegionLocation regionLocation = - table.getRegionLocator().getRegionLocation(currentKey, reload); - RegionBoundary regionBoundary = - new RegionBoundary(regionLocation.getRegion().getStartKey(), - regionLocation.getRegion().getEndKey()); - if (!regionBoundaries.contains(regionBoundary)) { - regionLocations.add(regionLocation); - regionBoundaries.add(regionBoundary); - } - currentKey = regionLocation.getRegion().getEndKey(); - // condition1 = currentKey != END_ROW_KEY - // condition2 = endKeyIsEndOfTable == true - // condition3 = currentKey < endKey - // condition4 = includeEndKey == true - // condition5 = currentKey == endKey - // while (condition1 && (condition2 || condition3 || (condition4 && condition5))) - } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) - && (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0 - || (includeEndKey && Bytes.compareTo(currentKey, endKey) == 0))); - } - } - protected void explain(String prefix, List<String> planSteps, ExplainPlanAttributesBuilder explainPlanAttributesBuilder, - List<List<Scan>> scansList) { + List<HRegionLocation> regionLocations) { StringBuilder buf = new StringBuilder(prefix); ScanRanges scanRanges = context.getScanRanges(); Scan scan = context.getScan(); @@ -337,7 +278,7 @@ public abstract class ExplainTable { if (groupByLimitBytes != null) { groupByLimit = (Integer) PInteger.INSTANCE.toObject(groupByLimitBytes); } - getRegionLocations(planSteps, explainPlanAttributesBuilder, scansList); + getRegionLocations(planSteps, explainPlanAttributesBuilder, regionLocations); groupBy.explain(planSteps, groupByLimit, explainPlanAttributesBuilder); if (scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX) != null) { planSteps.add(" SERVER ARRAY ELEMENT PROJECTION"); @@ -352,13 +293,13 @@ public abstract class ExplainTable { * * @param planSteps list of plan steps to add explain plan output to. * @param explainPlanAttributesBuilder explain plan v2 attributes builder instance. - * @param scansList list of the list of scans, to be used for parallel scans. + * @param regionLocations region locations. */ private void getRegionLocations(List<String> planSteps, ExplainPlanAttributesBuilder explainPlanAttributesBuilder, - List<List<Scan>> scansList) { + List<HRegionLocation> regionLocations) { String regionLocationPlan = getRegionLocationsForExplainPlan(explainPlanAttributesBuilder, - scansList); + regionLocations); if (regionLocationPlan.length() > 0) { planSteps.add(regionLocationPlan); } @@ -370,25 +311,26 @@ public abstract class ExplainTable { * print num of total list size. * * @param explainPlanAttributesBuilder explain plan v2 attributes builder instance. - * @param scansList list of the list of scans, to be used for parallel scans. + * @param regionLocationsFromResultIterator region locations. * @return region locations to be added to the explain plan output. */ private String getRegionLocationsForExplainPlan( ExplainPlanAttributesBuilder explainPlanAttributesBuilder, - List<List<Scan>> scansList) { + List<HRegionLocation> regionLocationsFromResultIterator) { + if (regionLocationsFromResultIterator == null) { + return ""; + } try { StringBuilder buf = new StringBuilder().append(REGION_LOCATIONS); Set<RegionBoundary> regionBoundaries = new HashSet<>(); List<HRegionLocation> regionLocations = new ArrayList<>(); - for (List<Scan> scans : scansList) { - for (Scan eachScan : scans) { - getRegionsInRange(tableRef.getTable().getPhysicalName().getBytes(), - eachScan.getStartRow(), - eachScan.getStopRow(), - true, - false, - regionBoundaries, - regionLocations); + for (HRegionLocation regionLocation : regionLocationsFromResultIterator) { + RegionBoundary regionBoundary = + new RegionBoundary(regionLocation.getRegion().getStartKey(), + regionLocation.getRegion().getEndKey()); + if (!regionBoundaries.contains(regionBoundary)) { + regionLocations.add(regionLocation); + regionBoundaries.add(regionBoundary); } } int maxLimitRegionLoc = context.getConnection().getQueryServices().getConfiguration() @@ -410,7 +352,7 @@ public abstract class ExplainTable { } buf.append(") "); return buf.toString(); - } catch (IOException | SQLException | UnsupportedOperationException e) { + } catch (Exception e) { LOGGER.error("Explain table unable to add region locations.", e); return ""; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java index e1c99e2836..c9d7147efc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelScansCollector.java @@ -20,6 +20,7 @@ package org.apache.phoenix.iterate; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.client.Scan; import org.apache.phoenix.compile.QueryPlan; @@ -33,19 +34,22 @@ public class ParallelScansCollector { private final List<List<Scan>> parallelScans = new ArrayList<>(); private List<Scan> lastBatch = new ArrayList<>(); private Scan lastScan = null; + private final List<HRegionLocation> regionLocations = new ArrayList<>(); public ParallelScansCollector(ParallelScanGrouper grouper) { this.grouper = grouper; parallelScans.add(lastBatch); } - public void addNewScan(QueryPlan plan, Scan newScan, boolean crossesRegionBoundary) { + public void addNewScan(QueryPlan plan, Scan newScan, boolean crossesRegionBoundary, + HRegionLocation regionLocation) { if (grouper.shouldStartNewScan(plan, lastScan, newScan.getStartRow(), - lastScanCrossedRegionBoundary)) { + lastScanCrossedRegionBoundary)) { lastBatch = new ArrayList<>(); parallelScans.add(lastBatch); } lastBatch.add(newScan); + regionLocations.add(regionLocation); lastScanCrossedRegionBoundary = crossesRegionBoundary; lastScan = newScan; @@ -54,4 +58,8 @@ public class ParallelScansCollector { public List<List<Scan>> getParallelScans() { return parallelScans; } + + public List<HRegionLocation> getRegionLocations() { + return regionLocations; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScansWithRegionLocations.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScansWithRegionLocations.java new file mode 100644 index 0000000000..1bd3f45c56 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScansWithRegionLocations.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.phoenix.iterate; + +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.Scan; + +import java.util.List; + +/** + * Scan list to be retrieved for the BaseResultIterator with the list of region locations the scans + * would be served from. + */ +public class ScansWithRegionLocations { + + private final List<List<Scan>> scans; + private final List<HRegionLocation> regionLocations; + + public ScansWithRegionLocations(List<List<Scan>> scans, + List<HRegionLocation> regionLocations) { + this.scans = scans; + this.regionLocations = regionLocations; + } + + public List<List<Scan>> getScans() { + return scans; + } + + public List<HRegionLocation> getRegionLocations() { + return regionLocations; + } +}