PHOENIX-2676 Cannot support join operations in scans with limit (Maryann Xue)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c316d910 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c316d910 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c316d910 Branch: refs/heads/calcite Commit: c316d91044e2e92030e3fd9e9b5fccbf5cfd5e17 Parents: 818683a Author: James Taylor <jtay...@salesforce.com> Authored: Wed Feb 17 11:53:11 2016 -0800 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Feb 17 12:19:57 2016 -0800 ---------------------------------------------------------------------- .../coprocessor/HashJoinRegionScanner.java | 72 +++++++++++++------- 1 file changed, 48 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/c316d910/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index 1e34d96..8f64b55 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; @@ -46,6 +47,7 @@ import org.apache.phoenix.schema.KeyValueSchema; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.ResultTuple; import org.apache.phoenix.schema.tuple.Tuple; +import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.TupleUtil; public class HashJoinRegionScanner implements RegionScanner { @@ -53,6 +55,7 @@ public class HashJoinRegionScanner implements RegionScanner { private final RegionScanner scanner; private final TupleProjector projector; private final HashJoinInfo joinInfo; + private final RegionCoprocessorEnvironment env; private Queue<Tuple> resultQueue; private boolean hasMore; private long count; @@ -64,6 +67,7 @@ public class HashJoinRegionScanner implements RegionScanner { @SuppressWarnings("unchecked") public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException { + this.env = env; this.scanner = scanner; this.projector = projector; this.joinInfo = joinInfo; @@ -250,25 +254,35 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean nextRaw(List<Cell> result) throws IOException { - while (shouldAdvance()) { - hasMore = scanner.nextRaw(result); - processResults(result, false); - result.clear(); + try { + while (shouldAdvance()) { + hasMore = scanner.nextRaw(result); + processResults(result, false); + result.clear(); + } + + return nextInQueue(result); + } catch (Throwable t) { + ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); + return false; // impossible } - - return nextInQueue(result); } @Override public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { - while (shouldAdvance()) { - hasMore = scanner.nextRaw(result, scannerContext); - processResults(result, false); // TODO fix honoring the limit - result.clear(); + try { + while (shouldAdvance()) { + hasMore = scanner.nextRaw(result, scannerContext); + processResults(result, scannerContext != NoLimitScannerContext.getInstance()); + result.clear(); + } + + return nextInQueue(result); + } catch (Throwable t) { + ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); + return false; // impossible } - - return nextInQueue(result); } @Override @@ -283,24 +297,34 @@ public class HashJoinRegionScanner implements RegionScanner { @Override public boolean next(List<Cell> result) throws IOException { - while (shouldAdvance()) { - hasMore = scanner.next(result); - processResults(result, false); - result.clear(); + try { + while (shouldAdvance()) { + hasMore = scanner.next(result); + processResults(result, false); + result.clear(); + } + + return nextInQueue(result); + } catch (Throwable t) { + ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); + return false; // impossible } - - return nextInQueue(result); } @Override public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { - while (shouldAdvance()) { - hasMore = scanner.next(result, scannerContext); - processResults(result, false); // TODO honoring the limit - result.clear(); + try { + while (shouldAdvance()) { + hasMore = scanner.next(result, scannerContext); + processResults(result, scannerContext != NoLimitScannerContext.getInstance()); + result.clear(); + } + + return nextInQueue(result); + } catch (Throwable t) { + ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(), t); + return false; // impossible } - - return nextInQueue(result); } @Override