PHOENIX-2676 Cannot support join operations in scans with limit (Maryann Xue)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c316d910
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c316d910
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c316d910

Branch: refs/heads/calcite
Commit: c316d91044e2e92030e3fd9e9b5fccbf5cfd5e17
Parents: 818683a
Author: James Taylor <jtay...@salesforce.com>
Authored: Wed Feb 17 11:53:11 2016 -0800
Committer: James Taylor <jtay...@salesforce.com>
Committed: Wed Feb 17 12:19:57 2016 -0800

----------------------------------------------------------------------
 .../coprocessor/HashJoinRegionScanner.java      | 72 +++++++++++++-------
 1 file changed, 48 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c316d910/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index 1e34d96..8f64b55 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -46,6 +47,7 @@ import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.TupleUtil;
 
 public class HashJoinRegionScanner implements RegionScanner {
@@ -53,6 +55,7 @@ public class HashJoinRegionScanner implements RegionScanner {
     private final RegionScanner scanner;
     private final TupleProjector projector;
     private final HashJoinInfo joinInfo;
+    private final RegionCoprocessorEnvironment env;
     private Queue<Tuple> resultQueue;
     private boolean hasMore;
     private long count;
@@ -64,6 +67,7 @@ public class HashJoinRegionScanner implements RegionScanner {
 
     @SuppressWarnings("unchecked")
     public HashJoinRegionScanner(RegionScanner scanner, TupleProjector 
projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, 
RegionCoprocessorEnvironment env) throws IOException {
+        this.env = env;
         this.scanner = scanner;
         this.projector = projector;
         this.joinInfo = joinInfo;
@@ -250,25 +254,35 @@ public class HashJoinRegionScanner implements 
RegionScanner {
 
     @Override
     public boolean nextRaw(List<Cell> result) throws IOException {
-        while (shouldAdvance()) {
-            hasMore = scanner.nextRaw(result);
-            processResults(result, false);
-            result.clear();
+        try {
+            while (shouldAdvance()) {
+                hasMore = scanner.nextRaw(result);
+                processResults(result, false);
+                result.clear();
+            }
+            
+            return nextInQueue(result);
+        } catch (Throwable t) {
+            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
+            return false; // impossible
         }
-
-        return nextInQueue(result);
     }
 
     @Override
     public boolean nextRaw(List<Cell> result, ScannerContext scannerContext)
             throws IOException {
-        while (shouldAdvance()) {
-            hasMore = scanner.nextRaw(result, scannerContext);
-            processResults(result, false); // TODO fix honoring the limit
-            result.clear();
+        try {
+            while (shouldAdvance()) {
+                hasMore = scanner.nextRaw(result, scannerContext);
+                processResults(result, scannerContext != 
NoLimitScannerContext.getInstance());
+                result.clear();
+            }
+            
+            return nextInQueue(result);
+        } catch (Throwable t) {
+            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
+            return false; // impossible
         }
-
-        return nextInQueue(result);
     }
 
     @Override
@@ -283,24 +297,34 @@ public class HashJoinRegionScanner implements 
RegionScanner {
 
     @Override
     public boolean next(List<Cell> result) throws IOException {
-        while (shouldAdvance()) {
-            hasMore = scanner.next(result);
-            processResults(result, false);
-            result.clear();
+        try {
+            while (shouldAdvance()) {
+                hasMore = scanner.next(result);
+                processResults(result, false);
+                result.clear();
+            }
+            
+            return nextInQueue(result);
+        } catch (Throwable t) {
+            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
+            return false; // impossible
         }
-
-        return nextInQueue(result);
     }
 
     @Override
     public boolean next(List<Cell> result, ScannerContext scannerContext) 
throws IOException {
-        while (shouldAdvance()) {
-            hasMore = scanner.next(result, scannerContext);
-            processResults(result, false); // TODO honoring the limit
-            result.clear();
+        try {
+            while (shouldAdvance()) {
+                hasMore = scanner.next(result, scannerContext);
+                processResults(result, scannerContext != 
NoLimitScannerContext.getInstance());
+                result.clear();
+            }
+            
+            return nextInQueue(result);
+        } catch (Throwable t) {
+            
ServerUtil.throwIOException(env.getRegion().getRegionInfo().getRegionNameAsString(),
 t);
+            return false; // impossible
         }
-
-      return nextInQueue(result);
     }
 
     @Override

Reply via email to