Repository: phoenix
Updated Branches:
  refs/heads/master de9a2c7b0 -> a31b70179


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 3237882..77f4411 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -39,7 +39,6 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -64,6 +63,7 @@ import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.Closeables;
@@ -402,8 +402,10 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
             }
 
             Region region = c.getEnvironment().getRegion();
-            region.startRegionOperation();
+            boolean aquiredLock = false;
             try {
+                region.startRegionOperation();
+                aquiredLock = true;
                 synchronized (scanner) {
                     do {
                         List<Cell> results = new ArrayList<Cell>();
@@ -423,8 +425,8 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                         }
                     } while (hasMore && groupByCache.size() < limit);
                 }
-            } finally {
-                region.closeRegionOperation();
+            }  finally {
+                if (aquiredLock) region.closeRegionOperation();
             }
 
             RegionScanner regionScanner = groupByCache.getScanner(scanner);
@@ -472,8 +474,10 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                 // start of a new row. Otherwise, we have to wait until an agg
                 int countOffset = rowAggregators.length == 0 ? 1 : 0;
                 Region region = c.getEnvironment().getRegion();
-                region.startRegionOperation();
+                boolean aquiredLock = false;
                 try {
+                    region.startRegionOperation();
+                    aquiredLock = true;
                     synchronized (scanner) {
                         do {
                             List<Cell> kvs = new ArrayList<Cell>();
@@ -505,7 +509,7 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
                         } while (hasMore && !aggBoundary && !atLimit);
                     }
                 } finally {
-                    region.closeRegionOperation();
+                    if (aquiredLock) region.closeRegionOperation();
                 }
 
                 if (currentKey != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 77b8b3e..7e052f5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
 import org.apache.phoenix.schema.stats.StatisticsCollector;
@@ -288,8 +289,10 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         }
         long rowCount = 0;
         final RegionScanner innerScanner = theScanner;
-        region.startRegionOperation();
+        boolean aquiredLock = false;
         try {
+            region.startRegionOperation();
+            aquiredLock = true;
             synchronized (innerScanner) {
                 do {
                     List<Cell> results = new ArrayList<Cell>();
@@ -529,7 +532,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             try {
                 innerScanner.close();
             } finally {
-                region.closeRegionOperation();
+                if (aquiredLock) region.closeRegionOperation();
             }
         }
         if (logger.isDebugEnabled()) {
@@ -608,7 +611,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         InternalScanner internalScanner = scanner;
         if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) {
             try {
-                Pair<HRegionInfo, HRegionInfo> mergeRegions = null;
                 long clientTimeStamp = TimeKeeper.SYSTEM.getCurrentTime();
                 StatisticsCollector stats = 
StatisticsCollectorFactory.createStatisticsCollector(
                         c.getEnvironment(), table.getNameAsString(), 
clientTimeStamp,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index b125ecc..e70967c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
@@ -121,7 +122,7 @@ public class AggregatePlan extends BaseQueryPlan {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws 
SQLException {
             Expression expression = RowKeyExpression.INSTANCE;
             OrderByExpression orderByExpression = new 
OrderByExpression(expression, false, true);
             int threshold = 
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
@@ -138,9 +139,9 @@ public class AggregatePlan extends BaseQueryPlan {
             this.outerFactory = outerFactory;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName) throws SQLException {
-            PeekingResultIterator iterator = innerFactory.newIterator(context, 
scanner, scan, tableName);
-            return outerFactory.newIterator(context, iterator, scan, 
tableName);
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws 
SQLException {
+            PeekingResultIterator iterator = innerFactory.newIterator(context, 
scanner, scan, tableName, plan);
+            return outerFactory.newIterator(context, iterator, scan, 
tableName, plan);
         }
     }
 
@@ -164,12 +165,12 @@ public class AggregatePlan extends BaseQueryPlan {
     }
     
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
         if (groupBy.isEmpty()) {
-            
UngroupedAggregateRegionObserver.serializeIntoScan(context.getScan());
+            UngroupedAggregateRegionObserver.serializeIntoScan(scan);
         } else {
             // Set attribute with serialized expressions for coprocessor
-            
GroupedAggregateRegionObserver.serializeIntoScan(context.getScan(), 
groupBy.getScanAttribName(), groupBy.getKeyExpressions());
+            GroupedAggregateRegionObserver.serializeIntoScan(scan, 
groupBy.getScanAttribName(), groupBy.getKeyExpressions());
             if (limit != null && orderBy.getOrderByExpressions().isEmpty() && 
having == null
                     && (  (   statement.isDistinct() && ! 
statement.isAggregate() )
                             || ( ! statement.isDistinct() && (   
context.getAggregationManager().isEmpty()
@@ -200,8 +201,8 @@ public class AggregatePlan extends BaseQueryPlan {
             }
         }
         BaseResultIterators iterators = 
statement.getHint().hasHint(HintNode.Hint.SERIAL)
-                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper)
-                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory());
+                ? new SerialIterators(this, null, null, 
wrapParallelIteratorFactory(), scanGrouper, scan)
+                : new ParallelIterators(this, null, 
wrapParallelIteratorFactory(), scan);
 
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index cedd23e..83e55ee 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -203,26 +203,30 @@ public abstract class BaseQueryPlan implements QueryPlan {
     
     @Override
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper);
+        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, 
this.context.getScan());
     }
-    
+
+    @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList(), scanGrouper, 
scan);
+    }
+
     @Override
     public final ResultIterator iterator() throws SQLException {
-        return iterator(Collections.<SQLCloseable>emptyList(), 
DefaultParallelScanGrouper.getInstance());
+        return iterator(Collections.<SQLCloseable>emptyList(), 
DefaultParallelScanGrouper.getInstance(), this.context.getScan());
     }
 
-    public final ResultIterator iterator(final List<? extends SQLCloseable> 
dependencies, ParallelScanGrouper scanGrouper) throws SQLException {
+    public final ResultIterator iterator(final List<? extends SQLCloseable> 
dependencies, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         if (context.getScanRanges() == ScanRanges.NOTHING) {
             return ResultIterator.EMPTY_ITERATOR;
         }
         
         if (tableRef == TableRef.EMPTY_TABLE_REF) {
-            return newIterator(scanGrouper);
+            return newIterator(scanGrouper, scan);
         }
         
         // Set miscellaneous scan attributes. This is the last chance to set 
them before we
         // clone the scan for each parallelized chunk.
-        Scan scan = context.getScan();
         TableRef tableRef = context.getCurrentTable();
         PTable table = tableRef.getTable();
         
@@ -319,7 +323,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
                LOG.debug(LogUtil.addCustomAnnotations("Scan ready for 
iteration: " + scan, connection));
         }
         
-        ResultIterator iterator = newIterator(scanGrouper);
+        ResultIterator iterator = newIterator(scanGrouper, scan);
         iterator = dependencies.isEmpty() ?
                 iterator : new DelegateResultIterator(iterator) {
             @Override
@@ -448,7 +452,7 @@ public abstract class BaseQueryPlan implements QueryPlan {
         }
     }
 
-    abstract protected ResultIterator newIterator(ParallelScanGrouper 
scanGrouper) throws SQLException;
+    abstract protected ResultIterator newIterator(ParallelScanGrouper 
scanGrouper, Scan scan) throws SQLException;
     
     @Override
     public long getEstimatedSize() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index f4e374e..7871f9e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -26,6 +26,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -83,7 +84,12 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
-        ResultIterator iterator = delegate.iterator(scanGrouper);
+        return iterator(scanGrouper, delegate.getContext().getScan());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 003c995..4e43225 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
@@ -50,7 +51,11 @@ public class ClientScanPlan extends ClientProcessingPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
-        ResultIterator iterator = delegate.iterator(scanGrouper);
+        return iterator(scanGrouper, delegate.getContext().getScan());
+    }
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
+        ResultIterator iterator = delegate.iterator(scanGrouper, scan);
         if (where != null) {
             iterator = new FilterResultIterator(iterator, where);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
index 1b0af8c..fc5a04d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/CorrelatePlan.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
@@ -103,7 +104,12 @@ public class CorrelatePlan extends DelegateQueryPlan {
     }
 
     @Override
-    public ResultIterator iterator(ParallelScanGrouper scanGrouper)
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) 
+                throws SQLException {
+        return iterator(scanGrouper, null);
+    }
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan)
             throws SQLException {
         return new ResultIterator() {
             private final ValueBitSet destBitSet = 
ValueBitSet.newInstance(joinedSchema);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
index 36b725e..5887ff3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/DegenerateQueryPlan.java
@@ -52,7 +52,7 @@ public class DegenerateQueryPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
         return null;
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index 5fdec46..e3d9721 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -140,6 +140,11 @@ public class HashJoinPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
+        return iterator(scanGrouper, this.delegate.getContext().getScan());
+    }
+        
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
         int count = subPlans.length;
         PhoenixConnection connection = getContext().getConnection();
         ConnectionQueryServices services = connection.getQueryServices();
@@ -216,11 +221,10 @@ public class HashJoinPlan extends DelegateQueryPlan {
         }
 
         if (joinInfo != null) {
-            Scan scan = delegate.getContext().getScan();
             HashJoinInfo.serializeHashJoinIntoScan(scan, joinInfo);
         }
         
-        ResultIterator iterator = joinInfo == null ? 
delegate.iterator(scanGrouper) : ((BaseQueryPlan) 
delegate).iterator(dependencies, scanGrouper);
+        ResultIterator iterator = joinInfo == null ? 
delegate.iterator(scanGrouper) : ((BaseQueryPlan) 
delegate).iterator(dependencies, scanGrouper, scan);
         if (statement.getInnerSelectStatement() != null && postFilter != null) 
{
             iterator = new FilterResultIterator(iterator, postFilter);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
index fe767d9..db99964 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/LiteralResultIterationPlan.java
@@ -71,7 +71,7 @@ public class LiteralResultIterationPlan extends BaseQueryPlan 
{
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper)
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan)
             throws SQLException {
         ResultIterator scanner = new ResultIterator() {
             private final Iterator<Tuple> tupleIterator = tuples.iterator();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 93ae5d6..1f18ddd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -188,9 +188,8 @@ public class ScanPlan extends BaseQueryPlan {
     }
 
     @Override
-    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
+    protected ResultIterator newIterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
         // Set any scan attributes before creating the scanner, as it will be 
too late afterwards
-       Scan scan = context.getScan();
         scan.setAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY, 
QueryConstants.TRUE);
         ResultIterator scanner;
         TableRef tableRef = this.getTableRef();
@@ -208,11 +207,11 @@ public class ScanPlan extends BaseQueryPlan {
         BaseResultIterators iterators;
         boolean isOffsetOnServer = isOffsetPossibleOnServer(context, orderBy, 
offset, isSalted, table.getIndexType());
         if (isOffsetOnServer) {
-            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper);
+            iterators = new SerialIterators(this, perScanLimit, offset, 
parallelIteratorFactory, scanGrouper, scan);
         } else if (isSerial) {
-            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper);
+            iterators = new SerialIterators(this, perScanLimit, null, 
parallelIteratorFactory, scanGrouper, scan);
         } else {
-            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper);
+            iterators = new ParallelIterators(this, perScanLimit, 
parallelIteratorFactory, scanGrouper, scan);
         }
         splits = iterators.getSplits();
         scans = iterators.getScans();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index e181e80..8e0e6e2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -131,7 +131,12 @@ public class SortMergeJoinPlan implements QueryPlan {
     }
 
     @Override
-    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {        
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
+        return iterator(scanGrouper, null);
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {        
         return type == JoinType.Semi || type == JoinType.Anti ? 
                 new SemiAntiJoinIterator(lhsPlan.iterator(scanGrouper), 
rhsPlan.iterator(scanGrouper)) :
                 new BasicJoinIterator(lhsPlan.iterator(scanGrouper), 
rhsPlan.iterator(scanGrouper));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
index e8d9af0..0ba0cc1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/TupleProjectionPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.expression.Expression;
@@ -60,7 +61,12 @@ public class TupleProjectionPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
-        ResultIterator iterator = new 
DelegateResultIterator(delegate.iterator(scanGrouper)) {
+        return iterator(scanGrouper, delegate.getContext().getScan());
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
+        ResultIterator iterator = new 
DelegateResultIterator(delegate.iterator(scanGrouper, scan)) {
             
             @Override
             public Tuple next() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
index 808141e..cf95b5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnionPlan.java
@@ -139,7 +139,11 @@ public class UnionPlan implements QueryPlan {
     public final ResultIterator iterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
         return iterator(Collections.<SQLCloseable>emptyList());
     }
-    
+
+    @Override
+    public final ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan 
scan) throws SQLException {
+        return iterator(Collections.<SQLCloseable>emptyList());
+    }
     @Override
     public final ResultIterator iterator() throws SQLException {
         return iterator(Collections.<SQLCloseable>emptyList());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
index 8905eef..94c59df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/UnnestArrayPlan.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.execute;
 import java.sql.SQLException;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
@@ -53,7 +54,12 @@ public class UnnestArrayPlan extends DelegateQueryPlan {
 
     @Override
     public ResultIterator iterator(ParallelScanGrouper scanGrouper) throws 
SQLException {
-        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper));
+        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, 
delegate.getContext().getScan()));
+    }
+
+    @Override
+    public ResultIterator iterator(ParallelScanGrouper scanGrouper, Scan scan) 
throws SQLException {
+        return new UnnestArrayResultIterator(delegate.iterator(scanGrouper, 
scan));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 0299f18..0cf2faa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -17,8 +17,10 @@
  */
 package org.apache.phoenix.iterate;
 
-import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
+
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_FAILED_QUERY_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIMEOUT_COUNTER;
 import static org.apache.phoenix.util.ByteUtil.EMPTY_BYTE_ARRAY;
@@ -116,7 +118,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     private final List<KeyRange> splits;
     private final PTableStats tableStats;
     private final byte[] physicalTableName;
-    private final QueryPlan plan;
+    protected final QueryPlan plan;
     protected final String scanId;
     protected final MutationState mutationState;
     private final ParallelScanGrouper scanGrouper;
@@ -125,6 +127,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     private Long estimatedRows;
     private Long estimatedSize;
     private boolean hasGuidePosts;
+    private Scan scan;
     
     static final Function<HRegionLocation, KeyRange> TO_KEY_RANGE = new 
Function<HRegionLocation, KeyRange>() {
         @Override
@@ -138,7 +141,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     }
     
     private boolean useStats() {
-        Scan scan = context.getScan();
         boolean isPointLookup = context.getScanRanges().isPointLookup();
         /*
          *  Don't use guide posts if:
@@ -153,11 +155,10 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         return true;
     }
     
-    private static void initializeScan(QueryPlan plan, Integer perScanLimit, 
Integer offset) {
+    private static void initializeScan(QueryPlan plan, Integer perScanLimit, 
Integer offset, Scan scan) {
         StatementContext context = plan.getContext();
         TableRef tableRef = plan.getTableRef();
         PTable table = tableRef.getTable();
-        Scan scan = context.getScan();
 
         Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
         // Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix 
their row keys
@@ -331,10 +332,11 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         }
     }
     
-    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper) throws SQLException {
+    public BaseResultIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset, ParallelScanGrouper scanGrouper, Scan scan) throws SQLException {
         super(plan.getContext(), plan.getTableRef(), plan.getGroupBy(), 
plan.getOrderBy(),
                 plan.getStatement().getHint(), plan.getLimit(), plan 
instanceof ScanPlan ? plan.getOffset() : null);
         this.plan = plan;
+        this.scan = scan;
         this.scanGrouper = scanGrouper;
         StatementContext context = plan.getContext();
         // Clone MutationState as the one on the connection will change if 
auto commit is on
@@ -347,7 +349,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         // Used to tie all the scans together during logging
         scanId = UUID.randomUUID().toString();
         
-        initializeScan(plan, perScanLimit, offset);
+        initializeScan(plan, perScanLimit, offset, scan);
         
         this.scans = getParallelScans();
         List<KeyRange> splitRanges = 
Lists.newArrayListWithExpectedSize(scans.size() * 
ESTIMATED_GUIDEPOSTS_PER_REGION);
@@ -471,10 +473,69 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     }
 
     private List<List<Scan>> getParallelScans() throws SQLException {
+        // If the scan boundaries are not matching with scan in context that 
means we need to get
+        // parallel scans for the chunk after split/merge.
+        if (!ScanUtil.isConextScan(scan, context)) {
+            return getParallelScans(scan);
+        }
         return getParallelScans(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
     }
 
     /**
+     * Get parallel scans of the specified scan boundaries. This can be used 
for getting parallel
+     * scans when there is split/merges while scanning a chunk. In this case 
we need not go by all
+     * the regions or guideposts.
+     * @param scan
+     * @return
+     * @throws SQLException
+     */
+    private List<List<Scan>> getParallelScans(Scan scan) throws SQLException {
+        List<HRegionLocation> regionLocations = 
context.getConnection().getQueryServices()
+                .getAllTableRegions(physicalTableName);
+        List<byte[]> regionBoundaries = toBoundaries(regionLocations);
+        int regionIndex = 0;
+        int stopIndex = regionBoundaries.size();
+        if (scan.getStartRow().length > 0) {
+            regionIndex = getIndexContainingInclusive(regionBoundaries, 
scan.getStartRow());
+        }
+        if (scan.getStopRow().length > 0) {
+            stopIndex = Math.min(stopIndex, regionIndex + 
getIndexContainingExclusive(regionBoundaries.subList(regionIndex, stopIndex), 
scan.getStopRow()));
+        }
+        List<List<Scan>> parallelScans = 
Lists.newArrayListWithExpectedSize(stopIndex - regionIndex + 1);
+        List<Scan> scans = Lists.newArrayListWithExpectedSize(2);
+        while (regionIndex <= stopIndex) {
+            HRegionLocation regionLocation = regionLocations.get(regionIndex);
+            HRegionInfo regionInfo = regionLocation.getRegionInfo();
+            Scan newScan = ScanUtil.newScan(scan);
+            byte[] endKey;
+            if (regionIndex == stopIndex) {
+                endKey = scan.getStopRow();
+            } else {
+                endKey = regionBoundaries.get(regionIndex);
+            }
+            if(ScanUtil.isLocalIndex(scan)) {
+                ScanUtil.setLocalIndexAttributes(newScan, 0, 
regionInfo.getStartKey(),
+                    regionInfo.getEndKey(), 
newScan.getAttribute(SCAN_START_ROW_SUFFIX),
+                    newScan.getAttribute(SCAN_STOP_ROW_SUFFIX));
+            } else {
+                if(Bytes.compareTo(scan.getStartRow(), 
regionInfo.getStartKey())<=0) {
+                    newScan.setAttribute(SCAN_ACTUAL_START_ROW, 
regionInfo.getStartKey());
+                    newScan.setStartRow(regionInfo.getStartKey());
+                }
+                if(scan.getStopRow().length == 0 || 
(regionInfo.getEndKey().length != 0 && Bytes.compareTo(scan.getStopRow(), 
regionInfo.getEndKey())>0)) {
+                    newScan.setStopRow(regionInfo.getEndKey());
+                }
+            }
+            scans = addNewScan(parallelScans, scans, newScan, endKey, true, 
regionLocation);
+            regionIndex++;
+        }
+        if (!scans.isEmpty()) { // Add any remaining scans
+            parallelScans.add(scans);
+        }
+        return parallelScans;
+    }
+
+    /**
      * Compute the list of parallel scans to run for a given query. The inner 
scans
      * may be concatenated together directly, while the other ones may need to 
be
      * merge sorted, depending on the query.
@@ -482,7 +543,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
      * @throws SQLException
      */
     private List<List<Scan>> getParallelScans(byte[] startKey, byte[] stopKey) 
throws SQLException {
-        Scan scan = context.getScan();
         List<HRegionLocation> regionLocations = 
context.getConnection().getQueryServices()
                 .getAllTableRegions(physicalTableName);
         List<byte[]> regionBoundaries = toBoundaries(regionLocations);
@@ -555,6 +615,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
     
             // Merge bisect with guideposts for all but the last region
             while (regionIndex <= stopIndex) {
+                HRegionLocation regionLocation = 
regionLocations.get(regionIndex);
+                HRegionInfo regionInfo = regionLocation.getRegionInfo();
                 byte[] currentGuidePostBytes = currentGuidePost.copyBytes();
                 byte[] endKey, endRegionKey = EMPTY_BYTE_ARRAY;
                 if (regionIndex == stopIndex) {
@@ -562,9 +624,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                 } else {
                     endKey = regionBoundaries.get(regionIndex);
                 }
-                HRegionLocation regionLocation = 
regionLocations.get(regionIndex);
                 if (isLocalIndex) {
-                    HRegionInfo regionInfo = regionLocation.getRegionInfo();
                     endRegionKey = regionInfo.getEndKey();
                     keyOffset = 
ScanUtil.getRowKeyOffset(regionInfo.getStartKey(), endRegionKey);
                 }
@@ -572,6 +632,10 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     while (guideIndex < gpsSize && 
(currentGuidePost.compareTo(endKey) <= 0 || endKey.length == 0)) {
                         Scan newScan = scanRanges.intersectScan(scan, 
currentKeyBytes, currentGuidePostBytes, keyOffset,
                                 false);
+                        if(newScan != null) {
+                            ScanUtil.setLocalIndexAttributes(newScan, 
keyOffset, regionInfo.getStartKey(),
+                                regionInfo.getEndKey(), newScan.getStartRow(), 
newScan.getStopRow());
+                        }
                         if (newScan != null) {
                             estimatedRows += 
gps.getRowCounts().get(guideIndex);
                             estimatedSize += 
gps.getByteCounts().get(guideIndex);
@@ -584,12 +648,9 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                     }
                 } catch (EOFException e) {}
                 Scan newScan = scanRanges.intersectScan(scan, currentKeyBytes, 
endKey, keyOffset, true);
-                if (isLocalIndex) {
-                    if (newScan != null) {
-                        newScan.setAttribute(EXPECTED_UPPER_REGION_KEY, 
endRegionKey);
-                    } else if (!scans.isEmpty()) {
-                        
scans.get(scans.size()-1).setAttribute(EXPECTED_UPPER_REGION_KEY, endRegionKey);
-                    }
+                if(newScan != null) {
+                    ScanUtil.setLocalIndexAttributes(newScan, keyOffset, 
regionInfo.getStartKey(),
+                        regionInfo.getEndKey(), newScan.getStartRow(), 
newScan.getStopRow());
                 }
                 scans = addNewScan(parallelScans, scans, newScan, endKey, 
true, regionLocation);
                 currentKeyBytes = endKey;
@@ -628,7 +689,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
      */
     @Override
     public List<PeekingResultIterator> getIterators() throws SQLException {
-        Scan scan = context.getScan();
         if (logger.isDebugEnabled()) {
             logger.debug(LogUtil.addCustomAnnotations("Getting iterators for " 
+ this,
                     ScanUtil.getCustomAnnotations(scan)));
@@ -676,7 +736,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         SQLException toThrow = null;
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
-            submitWork(scan, futures, allIterators, splitSize);
+            submitWork(scan, futures, allIterators, splitSize, 
this.scanGrouper);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future : 
reverseIfNecessary(futures,isReverse)) {
                 List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(future.size());
@@ -691,11 +751,8 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                                         previousScan.getScan().getStopRow()) < 
0)
                                 || (isReverse && 
Bytes.compareTo(scanPair.getFirst().getAttribute(SCAN_ACTUAL_START_ROW),
                                         previousScan.getScan().getStopRow()) > 
0)
-                                || 
(scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY) != null
-                                        && 
previousScan.getScan().getAttribute(EXPECTED_UPPER_REGION_KEY) != null
-                                        && 
Bytes.compareTo(scanPair.getFirst().getAttribute(EXPECTED_UPPER_REGION_KEY),
-                                                previousScan.getScan()
-                                                        
.getAttribute(EXPECTED_UPPER_REGION_KEY)) == 0))) {
+                                || 
(Bytes.compareTo(scanPair.getFirst().getStopRow(),
+                                                
previousScan.getScan().getStopRow()) == 0))) {
                             continue;
                         }
                         PeekingResultIterator iterator = 
scanPair.getSecond().get(timeOutForScan, TimeUnit.MILLISECONDS);
@@ -714,9 +771,6 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
                             Scan oldScan = scanPair.getFirst();
                             byte[] startKey = 
oldScan.getAttribute(SCAN_ACTUAL_START_ROW);
                             byte[] endKey = oldScan.getStopRow();
-                            if (isLocalIndex) {
-                                endKey = 
oldScan.getAttribute(EXPECTED_UPPER_REGION_KEY);
-                            }
                             
                             List<List<Scan>> newNestedScans = 
this.getParallelScans(startKey, endKey);
                             // Add any concatIterators that were successful so 
far
@@ -868,7 +922,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            Queue<PeekingResultIterator> allIterators, int estFlattenedSize) 
throws SQLException;
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, 
ParallelScanGrouper scanGrouper) throws SQLException;
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index a78565d..c4e52f4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -18,7 +18,7 @@
 
 package org.apache.phoenix.iterate;
 
-import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.STARTKEY_OFFSET;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
 import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 
 import java.sql.SQLException;
@@ -27,6 +27,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.query.QueryServices;
@@ -56,6 +57,7 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     private final MutationState mutationState;
     private Scan scan;
     private PeekingResultIterator resultIterator;
+    private QueryPlan plan;
 
     public static class ChunkedResultIteratorFactory implements 
ParallelIteratorFactory {
 
@@ -73,30 +75,31 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
         }
 
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName) throws SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String tableName, QueryPlan plan) throws 
SQLException {
             if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("ChunkedResultIteratorFactory.newIterator
 over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
             return new ChunkedResultIterator(delegateFactory, mutationState, 
context, tableRef, scan, 
                     
mutationState.getConnection().getQueryServices().getProps().getLong(
                                 QueryServices.SCAN_RESULT_CHUNK_SIZE,
-                                
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner);
+                                
QueryServicesOptions.DEFAULT_SCAN_RESULT_CHUNK_SIZE), scanner, plan);
         }
     }
 
     private ChunkedResultIterator(ParallelIteratorFactory 
delegateIteratorFactory, MutationState mutationState,
-               StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize, ResultIterator scanner) throws SQLException {
+               StatementContext context, TableRef tableRef, Scan scan, long 
chunkSize, ResultIterator scanner, QueryPlan plan) throws SQLException {
         this.delegateIteratorFactory = delegateIteratorFactory;
         this.context = context;
         this.tableRef = tableRef;
         this.scan = scan;
         this.chunkSize = chunkSize;
         this.mutationState = mutationState;
+        this.plan = plan;
         // Instantiate single chunk iterator and the delegate iterator in 
constructor
         // to get parallel scans kicked off in separate threads. If we delay 
this,
         // we'll get serialized behavior (see PHOENIX-
         if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Get first chunked result iterator 
over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
         ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(scanner, chunkSize);
         String tableName = tableRef.getTable().getPhysicalName().getString();
-        resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName);
+        resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName, plan);
     }
 
     @Override
@@ -123,13 +126,20 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
         if (resultIterator.peek() == null && lastKey != null) {
             resultIterator.close();
             scan = ScanUtil.newScan(scan);
-            scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            if(ScanUtil.isLocalIndex(scan)) {
+                scan.setAttribute(SCAN_START_ROW_SUFFIX, 
ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            } else {
+                scan.setStartRow(ByteUtil.copyKeyBytesIfNecessary(lastKey));
+            }
             if (logger.isDebugEnabled()) 
logger.debug(LogUtil.addCustomAnnotations("Get next chunked result iterator 
over " + tableRef.getTable().getPhysicalName().getString() + " with " + scan, 
ScanUtil.getCustomAnnotations(scan)));
             String tableName = 
tableRef.getTable().getPhysicalName().getString();
             long renewLeaseThreshold = 
context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
-            ResultIterator singleChunkResultIterator = new 
SingleChunkResultIterator(
-                    new TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold), chunkSize);
-            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName);
+            ResultIterator singleChunkResultIterator =
+                    new SingleChunkResultIterator(new 
TableResultIterator(mutationState, scan,
+                            
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName),
+                            renewLeaseThreshold, plan, 
DefaultParallelScanGrouper.getInstance()),
+                            chunkSize);
+            resultIterator = delegateIteratorFactory.newIterator(context, 
singleChunkResultIterator, scan, tableName, plan);
         }
         return resultIterator;
     }
@@ -163,9 +173,6 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
                 // be able to start the next chunk on the next row key
                 if (rowCount == chunkSize) {
                     next.getKey(lastKey);
-                    if (scan.getAttribute(STARTKEY_OFFSET) != null) {
-                        addRegionStartKeyToLaskKey();
-                    }
                 } else if (rowCount > chunkSize && rowKeyChanged(next)) {
                     chunkComplete = true;
                     return null;
@@ -192,29 +199,10 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
             int offset = lastKey.getOffset();
             int length = lastKey.getLength();
             newTuple.getKey(lastKey);
-            if (scan.getAttribute(STARTKEY_OFFSET) != null) {
-                addRegionStartKeyToLaskKey();
-            }
 
             return Bytes.compareTo(currentKey, offset, length, lastKey.get(), 
lastKey.getOffset(), lastKey.getLength()) != 0;
         }
 
-        /**
-         * Prefix region start key to last key to form actual row key in case 
of local index scan.
-         */
-        private void addRegionStartKeyToLaskKey() {
-            byte[] offsetBytes = scan.getAttribute(STARTKEY_OFFSET);
-            if (offsetBytes != null) {
-                int startKeyOffset = Bytes.toInt(offsetBytes);
-                byte[] actualLastkey =
-                        new byte[startKeyOffset + lastKey.getLength() - 
lastKey.getOffset()];
-                System.arraycopy(scan.getStartRow(), 0, actualLastkey, 0, 
startKeyOffset);
-                System.arraycopy(lastKey.get(), lastKey.getOffset(), 
actualLastkey,
-                    startKeyOffset, lastKey.getLength());
-                lastKey.set(actualLastkey);
-            }
-        }
-
                @Override
                public String toString() {
                        return "SingleChunkResultIterator [rowCount=" + rowCount

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
index 2258caf..b720b56 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/DefaultTableResultIteratorFactory.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
@@ -28,8 +29,8 @@ public class DefaultTableResultIteratorFactory implements 
TableResultIteratorFac
 
     @Override
     public TableResultIterator newIterator(MutationState mutationState, 
TableRef tableRef, Scan scan,
-            CombinableMetric scanMetrics, long renewLeaseThreshold) throws 
SQLException {
-        return new TableResultIterator(mutationState, tableRef, scan, 
scanMetrics, renewLeaseThreshold);
+            CombinableMetric scanMetrics, long renewLeaseThreshold, QueryPlan 
plan, ParallelScanGrouper scanGrouper) throws SQLException {
+        return new TableResultIterator(mutationState, scan, scanMetrics, 
renewLeaseThreshold, plan, scanGrouper);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
index f25e373..dbe9910 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIteratorFactory.java
@@ -20,15 +20,16 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 
 public interface ParallelIteratorFactory {
     public static ParallelIteratorFactory NOOP_FACTORY = new 
ParallelIteratorFactory() {
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String physicalTableName)
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan)
                 throws SQLException {
             return LookAheadResultIterator.wrap(scanner);
         }
     };
-    PeekingResultIterator newIterator(StatementContext context, ResultIterator 
scanner, Scan scan, String physicalTableName) throws SQLException;
+    PeekingResultIterator newIterator(StatementContext context, ResultIterator 
scanner, Scan scan, String physicalTableName, QueryPlan plan) throws 
SQLException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index ca0eba0..e40953e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -54,20 +54,20 @@ public class ParallelIterators extends BaseResultIterators {
        private static final String NAME = "PARALLEL";
     private final ParallelIteratorFactory iteratorFactory;
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, ParallelScanGrouper scanGrouper, Scan 
scan)
             throws SQLException {
-        super(plan, perScanLimit, null, scanGrouper);
+        super(plan, perScanLimit, null, scanGrouper, scan);
         this.iteratorFactory = iteratorFactory;
     }   
     
-    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory)
+    public ParallelIterators(QueryPlan plan, Integer perScanLimit, 
ParallelIteratorFactory iteratorFactory, Scan scan)
             throws SQLException {
-        this(plan, perScanLimit, iteratorFactory, 
DefaultParallelScanGrouper.getInstance());
+        this(plan, perScanLimit, iteratorFactory, 
DefaultParallelScanGrouper.getInstance(), scan);
     }  
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) throws SQLException {
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize, ParallelScanGrouper scanGrouper) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -98,7 +98,7 @@ public class ParallelIterators extends BaseResultIterators {
             final Scan scan = scanLocation.getScan();
             final CombinableMetric scanMetrics = 
readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(readMetrics, physicalTableName);
-            final TableResultIterator tableResultItr = 
context.getConnection().getTableResultIteratorFactory().newIterator(mutationState,
 tableRef, scan, scanMetrics, renewLeaseThreshold);
+            final TableResultIterator tableResultItr = 
context.getConnection().getTableResultIteratorFactory().newIterator(mutationState,
 tableRef, scan, scanMetrics, renewLeaseThreshold, this.plan, scanGrouper);
             context.getConnection().addIterator(tableResultItr);
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 
@@ -109,7 +109,7 @@ public class ParallelIterators extends BaseResultIterators {
                     if (logger.isDebugEnabled()) {
                         logger.debug(LogUtil.addCustomAnnotations("Id: " + 
scanId + ", Time: " + (System.currentTimeMillis() - startTime) + "ms, Scan: " + 
scan, ScanUtil.getCustomAnnotations(scan)));
                     }
-                    PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName);
+                    PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, tableResultItr, scan, physicalTableName, 
ParallelIterators.this.plan);
                     // Fill the scanner's cache. This helps reduce latency 
since we are parallelizing the I/O needed.
                     iterator.peek();
                     allIterators.add(iterator);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 17c2279..1f89ec1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -55,9 +55,9 @@ public class SerialIterators extends BaseResultIterators {
     private final ParallelIteratorFactory iteratorFactory;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset,
-            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper)
+            ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper, Scan scan)
             throws SQLException {
-        super(plan, perScanLimit, offset, scanGrouper);
+        super(plan, perScanLimit, offset, scanGrouper, scan);
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(
                 offset != null || perScanLimit != null || 
plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL));
@@ -66,7 +66,7 @@ public class SerialIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize, final ParallelScanGrouper scanGrouper) {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -90,9 +90,9 @@ public class SerialIterators extends BaseResultIterators {
                     PeekingResultIterator previousIterator = null;
                        List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(scans.size());
                        for (final Scan scan : scans) {
-                           TableResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold, previousIterator);
+                           TableResultIterator scanner = new 
TableResultIterator(mutationState, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold, previousIterator, plan, scanGrouper);
                            conn.addIterator(scanner);
-                           PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan, tableName);
+                           PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan, tableName, plan);
                            concatIterators.add(iterator);
                            previousIterator = iterator;
                        }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 0a3c32b..64e33de 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -36,6 +36,7 @@ import org.apache.commons.io.output.DeferredFileOutputStream;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
@@ -71,7 +72,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             this.services = services;
         }
         @Override
-        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String physicalTableName) throws 
SQLException {
+        public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan, String physicalTableName, QueryPlan plan) 
throws SQLException {
             ReadMetricQueue readRequestMetric = context.getReadMetricsQueue();
             SpoolingMetricsHolder spoolMetrics = new 
SpoolingMetricsHolder(readRequestMetric, physicalTableName);
             MemoryMetricsHolder memoryMetrics = new 
MemoryMetricsHolder(readRequestMetric, physicalTableName);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index a86f899..a1ef7da 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -17,6 +17,9 @@
  */
 package org.apache.phoenix.iterate;
 
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+
 import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.CLOSED;
 import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.NOT_RENEWED;
 import static 
org.apache.phoenix.iterate.TableResultIterator.RenewLeaseStatus.RENEWED;
@@ -32,14 +35,17 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.schema.StaleRegionBoundaryCacheException;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -59,6 +65,10 @@ public class TableResultIterator implements ResultIterator {
     private final CombinableMetric scanMetrics;
     private static final ResultIterator UNINITIALIZED_SCANNER = 
ResultIterator.EMPTY_ITERATOR;
     private final long renewLeaseThreshold;
+    private final QueryPlan plan;
+    private final ParallelScanGrouper scanGrouper;
+    private Tuple lastTuple = null;
+    private ImmutableBytesWritable ptr = new ImmutableBytesWritable();
 
     @GuardedBy("this")
     private ResultIterator scanIterator;
@@ -76,26 +86,29 @@ public class TableResultIterator implements ResultIterator {
         this.renewLeaseThreshold = 0;
         this.htable = null;
         this.scan = null;
+        this.plan = null;
+        this.scanGrouper = null;
     }
 
     public static enum RenewLeaseStatus {
         RENEWED, CLOSED, UNINITIALIZED, THRESHOLD_NOT_REACHED, NOT_RENEWED
     };
 
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics,
-                       long renewLeaseThreshold) throws SQLException {
-       this(mutationState,tableRef,scan,scanMetrics,renewLeaseThreshold,null);
+    public TableResultIterator(MutationState mutationState, Scan scan, 
CombinableMetric scanMetrics,
+                       long renewLeaseThreshold, QueryPlan plan, 
ParallelScanGrouper scanGrouper) throws SQLException {
+       this(mutationState,scan,scanMetrics,renewLeaseThreshold,null, plan, 
scanGrouper);
     }
     
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics,
-            long renewLeaseThreshold, PeekingResultIterator previousIterator) 
throws SQLException {
+    public TableResultIterator(MutationState mutationState, Scan scan, 
CombinableMetric scanMetrics,
+            long renewLeaseThreshold, PeekingResultIterator previousIterator, 
QueryPlan plan, ParallelScanGrouper scanGrouper) throws SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
-        PTable table = tableRef.getTable();
-        htable = mutationState.getHTable(table);
+        this.plan = plan;
+        htable = mutationState.getHTable(plan.getTableRef().getTable());
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
         this.previousIterator = previousIterator;
+        this.scanGrouper = scanGrouper;
     }
 
     @Override
@@ -116,8 +129,42 @@ public class TableResultIterator implements ResultIterator 
{
     @Override
     public synchronized Tuple next() throws SQLException {
         initScanner();
-        Tuple t = scanIterator.next();
-        return t;
+        try {
+            lastTuple = scanIterator.next();
+            if (lastTuple != null) {
+                ImmutableBytesWritable ptr = new ImmutableBytesWritable();
+                lastTuple.getKey(ptr);
+            }
+        } catch (SQLException e) {
+            try {
+                throw ServerUtil.parseServerException(e);
+            } catch(StaleRegionBoundaryCacheException e1) {
+                if(ScanUtil.isNonAggregateScan(scan)) {
+                    // For non aggregate queries if we get stale region 
boundary exception we can
+                    // continue scanning from the next value of lasted fetched 
result.
+                    Scan newScan = ScanUtil.newScan(scan);
+                    
newScan.setStartRow(newScan.getAttribute(SCAN_ACTUAL_START_ROW));
+                    if(lastTuple != null) {
+                        lastTuple.getKey(ptr);
+                        byte[] startRowSuffix = 
ByteUtil.copyKeyBytesIfNecessary(ptr);
+                        if(ScanUtil.isLocalIndex(newScan)) {
+                            // If we just set scan start row suffix then 
server side we prepare
+                            // actual scan boundaries by prefixing the region 
start key.
+                            newScan.setAttribute(SCAN_START_ROW_SUFFIX, 
ByteUtil.nextKey(startRowSuffix));
+                        } else {
+                            
newScan.setStartRow(ByteUtil.nextKey(startRowSuffix));
+                        }
+                    }
+                    
plan.getContext().getConnection().getQueryServices().clearTableRegionCache(htable.getTableName());
+                    this.scanIterator =
+                            plan.iterator(scanGrouper, newScan);
+                    lastTuple = scanIterator.next();
+                } else {
+                    throw e;
+                }
+            }
+        }
+        return lastTuple;
     }
 
     public synchronized void initScanner() throws SQLException {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
index 0f5ac9b..8d7b54d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIteratorFactory.java
@@ -20,10 +20,11 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.TableRef;
 
 public interface TableResultIteratorFactory {
-    public TableResultIterator newIterator(MutationState mutationState, 
TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long 
renewLeaseThreshold) throws SQLException;        
+    public TableResultIterator newIterator(MutationState mutationState, 
TableRef tableRef, Scan scan, CombinableMetric scanMetrics, long 
renewLeaseThreshold, QueryPlan plan, ParallelScanGrouper scanGrouper) throws 
SQLException;        
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 5b799a0..0c154e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -491,6 +491,11 @@ public class PhoenixStatement implements Statement, 
SQLCloseable {
                 }
 
                 @Override
+                public ResultIterator iterator(ParallelScanGrouper 
scanGrouper, Scan scan) throws SQLException {
+                    return iterator;
+                }
+
+                @Override
                 public long getEstimatedSize() {
                     return 0;
                 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
index 51ac795..17d9b6a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java
@@ -39,6 +39,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.iterate.ConcatResultIterator;
 import org.apache.phoenix.iterate.LookAheadResultIterator;
+import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.iterate.PeekingResultIterator;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.iterate.RoundRobinResultIterator;
@@ -50,6 +51,7 @@ import org.apache.phoenix.monitoring.ReadMetricQueue;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+
 import org.apache.phoenix.query.ConnectionQueryServices;
 
 /**
@@ -120,8 +122,7 @@ public class PhoenixRecordReader<T extends DBWritable> 
extends RecordReader<Null
             for (Scan scan : scans) {
                 // For MR, skip the region boundary check exception if we 
encounter a split. ref: PHOENIX-2599
                 
scan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK, 
Bytes.toBytes(true));
-                final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext().getConnection().getMutationState(),
-                        queryPlan.getTableRef(), scan, 
readMetrics.allotMetric(SCAN_BYTES, tableName), renewScannerLeaseThreshold);
+                final TableResultIterator tableResultIterator = new 
TableResultIterator(queryPlan.getContext().getConnection().getMutationState(), 
scan, readMetrics.allotMetric(SCAN_BYTES, tableName), 
renewScannerLeaseThreshold, queryPlan, 
MapReduceParallelScanGrouper.getInstance());
                 PeekingResultIterator peekingResultIterator = 
LookAheadResultIterator.wrap(tableResultIterator);
                 iterators.add(peekingResultIterator);
             }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 46589b9..3b37c69 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -20,6 +20,9 @@ package org.apache.phoenix.util;
 import static 
org.apache.phoenix.compile.OrderByCompiler.OrderBy.FWD_ROW_KEY_ORDER_BY;
 import static 
org.apache.phoenix.compile.OrderByCompiler.OrderBy.REV_ROW_KEY_ORDER_BY;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.CUSTOM_ANNOTATIONS;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_ACTUAL_START_ROW;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_START_ROW_SUFFIX;
+import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.SCAN_STOP_ROW_SUFFIX;
 
 import java.io.IOException;
 import java.sql.SQLException;
@@ -32,6 +35,7 @@ import java.util.NavigableSet;
 import java.util.TreeMap;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -101,6 +105,10 @@ public class ScanUtil {
         return scan.getAttribute(BaseScannerRegionObserver.LOCAL_INDEX) != 
null;
     }
 
+    public static boolean isNonAggregateScan(Scan scan) {
+        return 
scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null;
+    }
+
     // Use getTenantId and pass in column name to match against
     // in as PSchema attribute. If column name matches in 
     // KeyExpressions, set on scan as attribute
@@ -616,6 +624,62 @@ public class ScanUtil {
         }
     }
 
+    /**
+     * prefix region start key to the start row/stop row suffix and set as 
scan boundaries.
+     * @param scan
+     * @param lowerInclusiveRegionKey
+     * @param upperExclusiveRegionKey
+     */
+    public static void setupLocalIndexScan(Scan scan, byte[] 
lowerInclusiveRegionKey,
+            byte[] upperExclusiveRegionKey) {
+        byte[] prefix = lowerInclusiveRegionKey.length == 0 ? new 
byte[upperExclusiveRegionKey.length]: lowerInclusiveRegionKey;
+        int prefixLength = lowerInclusiveRegionKey.length == 0? 
upperExclusiveRegionKey.length: lowerInclusiveRegionKey.length;
+        if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+            
scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 
0, prefix, prefixLength));
+        }
+        if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+            
scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 
0, prefix, prefixLength));
+        }
+    }
+
+    public static byte[] getActualStartRow(Scan localIndexScan, HRegionInfo 
regionInfo) {
+        return localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX) == null ? 
localIndexScan
+                .getStartRow() : 
ScanRanges.prefixKey(localIndexScan.getAttribute(SCAN_START_ROW_SUFFIX), 0 ,
+            regionInfo.getStartKey().length == 0 ? new 
byte[regionInfo.getEndKey().length]
+                    : regionInfo.getStartKey(),
+            regionInfo.getStartKey().length == 0 ? 
regionInfo.getEndKey().length : regionInfo
+                    .getStartKey().length);
+    }
+
+    /**
+     * Set all attributes required and boundaries for local index scan.
+     * @param keyOffset
+     * @param regionStartKey
+     * @param regionEndKey
+     * @param newScan
+     */
+    public static void setLocalIndexAttributes(Scan newScan, int keyOffset, 
byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] 
stopRowSuffix) {
+        if(ScanUtil.isLocalIndex(newScan)) {
+             newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
+             newScan.setStartRow(regionStartKey);
+             newScan.setStopRow(regionEndKey);
+             if (keyOffset > 0 ) {
+                 newScan.setAttribute(SCAN_START_ROW_SUFFIX, 
ScanRanges.stripPrefix(startRowSuffix, keyOffset));
+             } else {
+                 newScan.setAttribute(SCAN_START_ROW_SUFFIX, startRowSuffix);
+             }
+             if (keyOffset > 0) {
+                 newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, 
ScanRanges.stripPrefix(stopRowSuffix, keyOffset));
+             } else {
+                 newScan.setAttribute(SCAN_STOP_ROW_SUFFIX, stopRowSuffix);
+             }
+         }
+    }
+
+    public static boolean isConextScan(Scan scan, StatementContext context) {
+        return Bytes.compareTo(context.getScan().getStartRow(), 
scan.getStartRow()) == 0 && Bytes
+                .compareTo(context.getScan().getStopRow(), scan.getStopRow()) 
== 0;
+    }
     public static int getRowKeyOffset(byte[] regionStartKey, byte[] 
regionEndKey) {
         return regionStartKey.length > 0 ? regionStartKey.length : 
regionEndKey.length;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 73aa4db..bbe8422 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -752,7 +752,7 @@ public abstract class BaseTest {
         conf.setClass(HConstants.HBASE_MASTER_LOADBALANCER_CLASS, 
IndexLoadBalancer.class,
             LoadBalancer.class);
         conf.setClass("hbase.coprocessor.regionserver.classes", 
LocalIndexMerger.class,
-            RegionServerObserver.class);
+            RegionServerObserver.class) ;
         conf.setInt("dfs.namenode.handler.count", 2);
         conf.setInt("dfs.namenode.service.handler.count", 2);
         conf.setInt("dfs.datanode.handler.count", 2);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a31b7017/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
index 47101b2..1da68ba 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/ParallelIteratorsSplitTest.java
@@ -386,7 +386,12 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
             public ResultIterator iterator(ParallelScanGrouper scanGrouper) 
throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
             }
-            
+
+            @Override
+            public ResultIterator iterator(ParallelScanGrouper scanGrouper, 
Scan scan) throws SQLException {
+                return ResultIterator.EMPTY_ITERATOR;
+            }
+
             @Override
             public ResultIterator iterator() throws SQLException {
                 return ResultIterator.EMPTY_ITERATOR;
@@ -467,7 +472,7 @@ public class ParallelIteratorsSplitTest extends 
BaseConnectionlessQueryTest {
                 return false;
             }
             
-        }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()));
+        }, null, new 
SpoolingResultIterator.SpoolingResultIteratorFactory(context.getConnection().getQueryServices()),
 context.getScan());
         List<KeyRange> keyRanges = parallelIterators.getSplits();
         return keyRanges;
     }

Reply via email to