PHOENIX-2665 index split while running group by query is returning duplicate results(Rajeshbabu)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c48fee04 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c48fee04 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c48fee04 Branch: refs/heads/calcite Commit: c48fee04e75fc9d08af981f1a2cc257e6cecdbdc Parents: c485a40 Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Authored: Thu Feb 11 02:38:48 2016 +0530 Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org> Committed: Thu Feb 11 02:38:48 2016 +0530 ---------------------------------------------------------------------- .../java/org/apache/phoenix/compile/ScanRanges.java | 2 ++ .../phoenix/coprocessor/BaseScannerRegionObserver.java | 4 +++- .../org/apache/phoenix/iterate/BaseResultIterators.java | 12 +++++++----- 3 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c48fee04/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java index 4d343f3..719970a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/ScanRanges.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.compile; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET; import java.io.IOException; @@ -384,6 +385,7 @@ public class ScanRanges { if (scanStopKey.length > 0 && Bytes.compareTo(scanStartKey, scanStopKey) >= 0) { return null; } + newScan.setAttribute(SCAN_ACTUAL_START_ROW, scanStartKey); newScan.setStartRow(scanStartKey); newScan.setStopRow(scanStopKey); if(keyOffset > 0) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/c48fee04/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index a363459..9487b36 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -97,6 +97,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = "_RunUpdateStatsAsync"; public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK"; public static final String TX_SCN = "_TxScn"; + public static final String SCAN_ACTUAL_START_ROW = "_ScanActualStartRow"; /** * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations @@ -137,7 +138,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { Bytes.compareTo(upperExclusiveRegionKey, expectedUpperRegionKey) != 0; } else { isStaleRegionBoundaries = Bytes.compareTo(lowerInclusiveScanKey, lowerInclusiveRegionKey) < 0 || - ( Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 0 && upperExclusiveRegionKey.length != 0); + ( Bytes.compareTo(upperExclusiveScanKey, upperExclusiveRegionKey) > 0 && upperExclusiveRegionKey.length != 0) || + (upperExclusiveRegionKey.length != 0 && upperExclusiveScanKey.length == 0); } if (isStaleRegionBoundaries) { Exception cause = new StaleRegionBoundaryCacheException(region.getRegionInfo().getTable().getNameAsString()); http://git-wip-us.apache.org/repos/asf/phoenix/blob/c48fee04/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java index b3235e2..3a3d1f2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java @@ -21,6 +21,7 @@ import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER; import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY; +import static org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW; import java.io.ByteArrayInputStream; import java.io.DataInput; @@ -471,7 +472,8 @@ public abstract class BaseResultIterators extends ExplainTable implements Result startKey = scanStartRow; } byte[] scanStopRow = scan.getStopRow(); - if (stopKey.length == 0 || Bytes.compareTo(scanStopRow, stopKey) < 0) { + if (stopKey.length == 0 + || (scanStopRow.length != 0 && Bytes.compareTo(scanStopRow, stopKey) < 0)) { stopKey = scanStopRow; } } @@ -632,16 +634,15 @@ public abstract class BaseResultIterators extends ExplainTable implements Result throw new SQLExceptionInfo.Builder(SQLExceptionCode.OPERATION_TIMED_OUT).setMessage(". Query couldn't be completed in the alloted time: " + queryTimeOut + " ms").build().buildException(); } if (isLocalIndex && previousScan != null && previousScan.getScan() != null - && ((!isReverse && Bytes.compareTo(scanPair.getFirst().getStartRow(), + && ((!isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW), previousScan.getScan().getStopRow()) < 0) - || (isReverse && Bytes.compareTo(scanPair.getFirst().getStartRow(), + || (isReverse && Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW), previousScan.getScan().getStopRow()) > 0) || (scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY) != null && previousScan.getScan().getAttribute(EXPECTED_UPPER_REGION_KEY) != null && Bytes.compareTo(scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY), previousScan.getScan() .getAttribute(EXPECTED_UPPER_REGION_KEY)) == 0))) { - continue; } PeekingResultIterator iterator = scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS); @@ -658,11 +659,12 @@ public abstract class BaseResultIterators extends ExplainTable implements Result } // Resubmit just this portion of work again Scan oldScan = scanPair.getFirst(); - byte[] startKey = oldScan.getStartRow(); + byte[] startKey = oldScan.getAttribute(SCAN_ACTUAL_START_ROW); byte[] endKey = oldScan.getStopRow(); if (isLocalIndex) { endKey = oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY); } + List<List<Scan>> newNestedScans = this.getParallelScans(startKey, endKey); // Add any concatIterators that were successful so far // as we need these to be in order