Repository: phoenix
Updated Branches:
  refs/heads/master da345f84c -> 6ff638e18


PHOENIX-2724 Query with large number of guideposts is slower compared to no 
stats


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6ff638e1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6ff638e1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6ff638e1

Branch: refs/heads/master
Commit: 6ff638e18de4b591aefd428cf888c5088d8ec175
Parents: da345f8
Author: Samarth <samarth.j...@salesforce.com>
Authored: Fri Apr 22 23:33:17 2016 -0700
Committer: Samarth <samarth.j...@salesforce.com>
Committed: Fri Apr 22 23:33:17 2016 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/QueryWithOffsetIT.java      |  33 ++---
 .../phoenix/end2end/SerialIteratorsIT.java      |  90 ++++++++++++
 .../phoenix/coprocessor/ScanRegionObserver.java |   9 +-
 .../org/apache/phoenix/execute/ScanPlan.java    |  17 ++-
 .../phoenix/iterate/BaseResultIterators.java    |   4 +-
 .../phoenix/iterate/ChunkedResultIterator.java  |  13 +-
 .../phoenix/iterate/OffsetResultIterator.java   |   2 +-
 .../phoenix/iterate/ParallelIterators.java      |   4 +-
 .../apache/phoenix/iterate/SerialIterators.java | 142 ++++++++++++++-----
 .../phoenix/iterate/SpoolingResultIterator.java |  17 ++-
 .../phoenix/iterate/TableResultIterator.java    |  22 +--
 .../apache/phoenix/jdbc/PhoenixConnection.java  |   9 +-
 .../apache/phoenix/query/QueryConstants.java    |   2 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |   1 +
 .../java/org/apache/phoenix/util/QueryUtil.java |  10 +-
 15 files changed, 282 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
index 83d759b..dd21f31 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryWithOffsetIT.java
@@ -89,9 +89,10 @@ public class QueryWithOffsetIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         rs = conn.createStatement()
                 .executeQuery("SELECT t_id from " + tableName + " order by 
t_id limit " + limit + " offset " + offset);
         int i = 0;
-        while (i++ < limit) {
+        while (i < limit) {
             assertTrue(rs.next());
-            assertEquals(strings[offset + i - 1], rs.getString(1));
+            assertEquals("Expected string didn't match for i = " + i, 
strings[offset + i], rs.getString(1));
+            i++;
         }
 
         limit = 35;
@@ -176,20 +177,6 @@ public class QueryWithOffsetIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         conn.close();
     }
 
-    private void initTableValues(Connection conn) throws SQLException {
-        for (int i = 0; i < 26; i++) {
-            conn.createStatement().execute("UPSERT INTO " + tableName + " 
values('" + strings[i] + "'," + i + ","
-                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
-        }
-        conn.commit();
-    }
-
-    private void updateStatistics(Connection conn) throws SQLException {
-        String query = "UPDATE STATISTICS " + tableName + " SET \"" + 
QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB
-                + "\"=" + Long.toString(500);
-        conn.createStatement().execute(query);
-    }
-
     @Test
     public void testMetaDataWithOffset() throws SQLException {
         Connection conn;
@@ -207,5 +194,19 @@ public class QueryWithOffsetIT extends 
BaseOwnClusterHBaseManagedTimeIT {
         ResultSetMetaData md = rs.getMetaData();
         assertEquals(5, md.getColumnCount());
     }
+    
+    private void initTableValues(Connection conn) throws SQLException {
+        for (int i = 0; i < 26; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " 
values('" + strings[i] + "'," + i + ","
+                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn.commit();
+    }
+
+    private void updateStatistics(Connection conn) throws SQLException {
+        String query = "UPDATE STATISTICS " + tableName + " SET \"" + 
QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB
+                + "\"=" + Long.toString(500);
+        conn.createStatement().execute(query);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java
new file mode 100644
index 0000000..d4c71af
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SerialIteratorsIT.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class SerialIteratorsIT extends BaseHBaseManagedTimeTableReuseIT {
+    private String tableName = generateRandomString();
+    private final String[] strings = { "a", "b", "c", "d", "e", "f", "g", "h", 
"i", "j", "k", "l", "m", "n", "o", "p",
+            "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+    private final String ddl = "CREATE TABLE " + tableName + " (t_id VARCHAR 
NOT NULL,\n" + "k1 INTEGER NOT NULL,\n"
+            + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 
VARCHAR,\n"
+            + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) SPLIT ON 
('e','i','o')";
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String,String> props = Maps.newHashMapWithExpectedSize(3);
+        // Don't force row key order
+        props.put(QueryServices.FORCE_ROW_KEY_ORDER_ATTRIB, 
Boolean.toString(false));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    
+    @Test
+    public void testConcatenatingSerialIterators() throws Exception {
+        Connection conn;
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        conn = DriverManager.getConnection(getUrl(), props);
+        createTestTable(getUrl(), ddl);
+        initTableValues(conn);
+        String query = "SELECT t_id from " + tableName + " order by t_id desc 
limit " + 10;
+        PhoenixStatement stmt = 
conn.createStatement().unwrap(PhoenixStatement.class);
+        ResultSet rs = stmt.executeQuery(query);
+        int i = 25;
+        while (i >= 16) {
+            assertTrue(rs.next());
+            assertEquals(strings[i--], rs.getString(1));
+        }
+        query = "SELECT t_id from " + tableName + " order by t_id limit " + 10;
+        stmt = conn.createStatement().unwrap(PhoenixStatement.class);
+        rs = stmt.executeQuery(query);
+        i = 0;
+        while (i < 10) {
+            assertTrue(rs.next());
+            assertEquals(strings[i++], rs.getString(1));
+        }
+        conn.close();
+    }
+    
+    private void initTableValues(Connection conn) throws SQLException {
+        for (int i = 0; i < 26; i++) {
+            conn.createStatement().execute("UPSERT INTO " + tableName + " 
values('" + strings[i] + "'," + i + ","
+                    + (i + 1) + "," + (i + 2) + ",'" + strings[25 - i] + "')");
+        }
+        conn.commit();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index 905dfa0..5feb3e9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -60,6 +60,7 @@ import 
org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.ValueBitSet;
 import org.apache.phoenix.schema.tuple.ResultTuple;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
@@ -193,7 +194,7 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         byte[] scanOffsetBytes = 
scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET);
         Integer scanOffset = null;
         if (scanOffsetBytes != null) {
-            scanOffset = Bytes.toInt(scanOffsetBytes);
+            scanOffset = (Integer) PInteger.INSTANCE.toObject(scanOffsetBytes);
         }
         RegionScanner innerScanner = s;
 
@@ -247,15 +248,11 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver {
         final Region region = c.getEnvironment().getRegion();
         region.startRegionOperation();
         try {
-            // Once we return from the first call to next, we've run through 
and
-            // cached
-            // the topN rows, so we no longer need to start/stop a region
-            // operation.
             Tuple tuple = iterator.next();
             if (tuple == null && !isLastScan) {
                 List<KeyValue> kvList = new ArrayList<KeyValue>(1);
                 KeyValue kv = new 
KeyValue(QueryConstants.OFFSET_ROW_KEY_BYTES, QueryConstants.OFFSET_FAMILY,
-                        QueryConstants.OFFSET_COLUMN, 
Bytes.toBytes(iterator.getUnusedOffset()));
+                        QueryConstants.OFFSET_COLUMN, 
PInteger.INSTANCE.toBytes(iterator.getRemainingOffset()));
                 kvList.add(kv);
                 Result r = new Result(kvList);
                 firstTuple = new ResultTuple(r);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index 93ae5d6..c5dabfe 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -18,6 +18,9 @@
 package org.apache.phoenix.execute;
 
 
+import static org.apache.phoenix.util.ScanUtil.isPacingScannersPossible;
+import static org.apache.phoenix.util.ScanUtil.isRoundRobinPossible;
+
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
@@ -113,6 +116,15 @@ public class ScanPlan extends BaseQueryPlan {
             return false;
         }
         PTable table = tableRef.getTable();
+        /*
+         * For salted or local index tables, if rows are requested in a row 
key order, then we
+         * cannot execute a query serially. We need to be able to do a merge 
sort across all scans
+         * which isn't possible with SerialIterators. For other kinds of 
tables though we are ok
+         * since SerialIterators execute scans in the correct order.
+         */
+        if ((table.getBucketNum() != null || table.getIndexType() == 
IndexType.LOCAL) && ScanUtil.shouldRowsBeInRowKeyOrder(orderBy, context)) {
+            return false;
+        }
         GuidePostsInfo gpsInfo = 
table.getTableStats().getGuidePosts().get(SchemaUtil.getEmptyColumnFamily(table));
         long estRowSize = SchemaUtil.estimateRowSize(table);
         long estRegionSize;
@@ -147,8 +159,9 @@ public class ScanPlan extends BaseQueryPlan {
             TableRef table, OrderBy orderBy, Integer limit,Integer offset, 
boolean allowPageFilter) throws SQLException {
 
         if ((isSerial(context, statement, table, orderBy, limit, offset, 
allowPageFilter)
-                || ScanUtil.isRoundRobinPossible(orderBy, context) || 
ScanUtil.isPacingScannersPossible(context))
-                && offset == null) { return 
ParallelIteratorFactory.NOOP_FACTORY; }
+                || isRoundRobinPossible(orderBy, context) || 
isPacingScannersPossible(context))) {
+            return ParallelIteratorFactory.NOOP_FACTORY;
+        }
         ParallelIteratorFactory spoolingResultIteratorFactory =
                 new SpoolingResultIterator.SpoolingResultIteratorFactory(
                         context.getConnection().getQueryServices());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
index 0299f18..043bd30 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java
@@ -676,7 +676,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
         SQLException toThrow = null;
         int queryTimeOut = context.getStatement().getQueryTimeoutInMillis();
         try {
-            submitWork(scan, futures, allIterators, splitSize);
+            submitWork(scan, futures, allIterators, splitSize, isReverse);
             boolean clearedCache = false;
             for (List<Pair<Scan,Future<PeekingResultIterator>>> future : 
reverseIfNecessary(futures,isReverse)) {
                 List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(future.size());
@@ -868,7 +868,7 @@ public abstract class BaseResultIterators extends 
ExplainTable implements Result
 
     abstract protected String getName();    
     abstract protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            Queue<PeekingResultIterator> allIterators, int estFlattenedSize) 
throws SQLException;
+            Queue<PeekingResultIterator> allIterators, int estFlattenedSize, 
boolean isReverse) throws SQLException;
     
     @Override
     public int size() {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
index a78565d..7a830de 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ChunkedResultIterator.java
@@ -44,7 +44,13 @@ import com.google.common.base.Preconditions;
 /**
  * {@code PeekingResultIterator} implementation that loads data in chunks. 
This is intended for
  * basic scan plans, to avoid loading large quantities of data from HBase in 
one go.
+ * 
+ * <p>
+ * Chunking is deprecated and shouldn't be used while implementing new 
features. As of HBase 0.98.17, 
+ * we rely on pacing the server side scanners instead of pulling rows from the 
server in chunks.
+ * </p>
  */
+@Deprecated
 public class ChunkedResultIterator implements PeekingResultIterator {
     private static final Logger logger = 
LoggerFactory.getLogger(ChunkedResultIterator.class);
 
@@ -56,7 +62,12 @@ public class ChunkedResultIterator implements 
PeekingResultIterator {
     private final MutationState mutationState;
     private Scan scan;
     private PeekingResultIterator resultIterator;
-
+    
+    /**
+     * Chunking is deprecated and shouldn't be used while implementing new 
features. As of HBase 0.98.17, 
+     * we rely on pacing the server side scanners instead of pulling rows from 
the server in chunks.
+     */
+    @Deprecated
     public static class ChunkedResultIteratorFactory implements 
ParallelIteratorFactory {
 
         private final ParallelIteratorFactory delegateFactory;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index ef8eacf..db53806 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -56,7 +56,7 @@ public class OffsetResultIterator extends 
DelegateResultIterator {
         return "OffsetResultIterator [rowCount=" + rowCount + ", offset=" + 
offset + "]";
     }
 
-    public Integer getUnusedOffset() {
+    public Integer getRemainingOffset() {
         return (offset - rowCount) > 0 ? (offset - rowCount) : 0;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
index ca0eba0..a5664c7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ParallelIterators.java
@@ -67,7 +67,7 @@ public class ParallelIterators extends BaseResultIterators {
 
     @Override
     protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) throws SQLException {
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize, boolean isReverse) throws SQLException {
         // Pre-populate nestedFutures lists so that we can shuffle the scans
         // and add the future to the right nested list. By shuffling the scans
         // we get better utilization of the cluster since our thread executor
@@ -99,7 +99,7 @@ public class ParallelIterators extends BaseResultIterators {
             final CombinableMetric scanMetrics = 
readMetrics.allotMetric(MetricType.SCAN_BYTES, physicalTableName);
             final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(readMetrics, physicalTableName);
             final TableResultIterator tableResultItr = 
context.getConnection().getTableResultIteratorFactory().newIterator(mutationState,
 tableRef, scan, scanMetrics, renewLeaseThreshold);
-            context.getConnection().addIterator(tableResultItr);
+            context.getConnection().addIteratorForLeaseRenewal(tableResultItr);
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 
                 @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index 17c2279..d2c89b9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -30,13 +30,16 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
 import org.apache.phoenix.parse.HintNode;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.trace.util.Tracing;
-import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.QueryUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -53,11 +56,13 @@ import com.google.common.collect.Lists;
 public class SerialIterators extends BaseResultIterators {
        private static final String NAME = "SERIAL";
     private final ParallelIteratorFactory iteratorFactory;
+    private final Integer offset;
     
     public SerialIterators(QueryPlan plan, Integer perScanLimit, Integer 
offset,
             ParallelIteratorFactory iteratorFactory, ParallelScanGrouper 
scanGrouper)
             throws SQLException {
         super(plan, perScanLimit, offset, scanGrouper);
+        this.offset = offset;
         // must be a offset or a limit specified or a SERIAL hint
         Preconditions.checkArgument(
                 offset != null || perScanLimit != null || 
plan.getStatement().getHint().hasHint(HintNode.Hint.SERIAL));
@@ -65,40 +70,28 @@ public class SerialIterators extends BaseResultIterators {
     }
 
     @Override
-    protected void submitWork(List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
-            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize) {
-        // Pre-populate nestedFutures lists so that we can shuffle the scans
-        // and add the future to the right nested list. By shuffling the scans
-        // we get better utilization of the cluster since our thread executor
-        // will spray the scans across machines as opposed to targeting a
-        // single one since the scans are in row key order.
+    protected void submitWork(final List<List<Scan>> nestedScans, 
List<List<Pair<Scan,Future<PeekingResultIterator>>>> nestedFutures,
+            final Queue<PeekingResultIterator> allIterators, int 
estFlattenedSize, boolean isReverse) {
         ExecutorService executor = 
context.getConnection().getQueryServices().getExecutor();
-        
-        for (final List<Scan> scans : nestedScans) {
-            Scan firstScan = scans.get(0);
-            Scan lastScan = scans.get(scans.size()-1);
-            final Scan overallScan = ScanUtil.newScan(firstScan);
-            overallScan.setStopRow(lastScan.getStopRow());
-            final String tableName = 
tableRef.getTable().getPhysicalName().getString();
-            final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
-            final PhoenixConnection conn = context.getConnection();
-            final long renewLeaseThreshold = 
conn.getQueryServices().getRenewLeaseThresholdMilliSeconds();
-            lastScan.setAttribute(QueryConstants.LAST_SCAN, 
Bytes.toBytes(Boolean.TRUE));
+        final String tableName = 
tableRef.getTable().getPhysicalName().getString();
+        final TaskExecutionMetricsHolder taskMetrics = new 
TaskExecutionMetricsHolder(context.getReadMetricsQueue(), tableName);
+        final PhoenixConnection conn = context.getConnection();
+        final long renewLeaseThreshold = 
conn.getQueryServices().getRenewLeaseThresholdMilliSeconds();
+        int expectedListSize = nestedScans.size() * 10;
+        List<Scan> flattenedScans = 
Lists.newArrayListWithExpectedSize(expectedListSize);
+        for (List<Scan> list : nestedScans) {
+            flattenedScans.addAll(list);
+        }
+        if (!flattenedScans.isEmpty()) { 
+            if (isReverse) {
+                flattenedScans = Lists.reverse(flattenedScans);
+            }
+            final List<Scan> finalScans = flattenedScans;
             Future<PeekingResultIterator> future = 
executor.submit(Tracing.wrap(new JobCallable<PeekingResultIterator>() {
                 @Override
                 public PeekingResultIterator call() throws Exception {
-                    PeekingResultIterator previousIterator = null;
-                       List<PeekingResultIterator> concatIterators = 
Lists.newArrayListWithExpectedSize(scans.size());
-                       for (final Scan scan : scans) {
-                           TableResultIterator scanner = new 
TableResultIterator(mutationState, tableRef, scan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold, previousIterator);
-                           conn.addIterator(scanner);
-                           PeekingResultIterator iterator = 
iteratorFactory.newIterator(context, scanner, scan, tableName);
-                           concatIterators.add(iterator);
-                           previousIterator = iterator;
-                       }
-                       PeekingResultIterator concatIterator = 
ConcatResultIterator.newIterator(concatIterators);
-                    allIterators.add(concatIterator);
-                    return concatIterator;
+                    PeekingResultIterator itr = new SerialIterator(finalScans, 
tableName, renewLeaseThreshold, offset);
+                    return itr;
                 }
 
                 /**
@@ -117,7 +110,7 @@ public class SerialIterators extends BaseResultIterators {
                 }
             }, "Serial scanner for table: " + 
tableRef.getTable().getPhysicalName().getString()));
             // Add our singleton Future which will execute serially
-            nestedFutures.add(Collections.singletonList(new 
Pair<Scan,Future<PeekingResultIterator>>(overallScan,future)));
+            nestedFutures.add(Collections.singletonList(new Pair<Scan, 
Future<PeekingResultIterator>>(flattenedScans.get(0), future)));
         }
     }
 
@@ -125,4 +118,89 @@ public class SerialIterators extends BaseResultIterators {
     protected String getName() {
         return NAME;
     }
+    
+    /**
+     * 
+     * Iterator that creates iterators for scans only when needed.
+     * This helps reduce the cost of pre-constructing all the iterators
+     * which we may not even use.
+     */
+    private class SerialIterator implements PeekingResultIterator {
+        private final List<Scan> scans;
+        private final String tableName;
+        private final long renewLeaseThreshold;
+        private int index;
+        private PeekingResultIterator currentIterator;
+        private Integer remainingOffset;
+        
+        private SerialIterator(List<Scan> flattenedScans, String tableName, 
long renewLeaseThreshold, Integer offset) throws SQLException {
+            this.scans = 
Lists.newArrayListWithExpectedSize(flattenedScans.size());
+            this.tableName = tableName;
+            this.renewLeaseThreshold = renewLeaseThreshold;
+            this.scans.addAll(flattenedScans);
+            this.remainingOffset = offset;
+            if (this.remainingOffset != null) {
+                // mark the last scan for offset purposes
+                this.scans.get(this.scans.size() - 
1).setAttribute(QueryConstants.LAST_SCAN, Bytes.toBytes(Boolean.TRUE));
+            }
+        }
+        
+        private PeekingResultIterator currentIterator() throws SQLException {
+            if (currentIterator == null) {
+                return currentIterator = nextIterator();
+            }
+            if (currentIterator.peek() == null) {
+                currentIterator.close();
+                currentIterator = nextIterator();
+            }
+            return currentIterator;
+        }
+        
+        private PeekingResultIterator nextIterator() throws SQLException {
+            if (index >= scans.size()) {
+                return EMPTY_ITERATOR;
+            }
+            while (index < scans.size()) {
+                Scan currentScan = scans.get(index++);
+                if (remainingOffset != null) {
+                    
currentScan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, 
PInteger.INSTANCE.toBytes(remainingOffset));
+                }
+                TableResultIterator itr = new 
TableResultIterator(mutationState, tableRef, currentScan, 
context.getReadMetricsQueue().allotMetric(SCAN_BYTES, tableName), 
renewLeaseThreshold);
+                PeekingResultIterator peekingItr = 
iteratorFactory.newIterator(context, itr, currentScan, tableName);
+                Tuple tuple;
+                if ((tuple = peekingItr.peek()) == null) {
+                    peekingItr.close();
+                    continue;
+                } else if ((remainingOffset = 
QueryUtil.getRemainingOffset(tuple)) != null) {
+                    peekingItr.next();
+                    peekingItr.close();
+                    continue;
+                }
+                context.getConnection().addIteratorForLeaseRenewal(itr);
+                return peekingItr;
+            }
+            return EMPTY_ITERATOR;
+        }
+        
+        @Override
+        public Tuple next() throws SQLException {
+            return currentIterator().next();
+        }
+
+        @Override
+        public void explain(List<String> planSteps) {}
+
+        @Override
+        public void close() throws SQLException {
+            if (currentIterator != null) {
+                currentIterator.close();
+            }
+        }
+
+        @Override
+        public Tuple peek() throws SQLException {
+            return currentIterator().peek();
+        }
+        
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 0a3c32b..4f85a5f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -55,15 +55,28 @@ import org.apache.phoenix.util.TupleUtil;
  *
  * Result iterator that spools the results of a scan to disk once an in-memory 
threshold has been reached.
  * If the in-memory threshold is not reached, the results are held in memory 
with no disk writing perfomed.
- *
- *
+ * 
+ * <p>
+ * Spooling is deprecated and shouldn't be used while implementing new 
features. As of HBase 0.98.17, 
+ * we rely on pacing the server side scanners instead of pulling rows from the 
server and  potentially 
+ * spooling to a temporary file created on clients.
+ * </p>
+ *  
  * @since 0.1
  */
+@Deprecated
 public class SpoolingResultIterator implements PeekingResultIterator {
     
     private final PeekingResultIterator spoolFrom;
     private final SpoolingMetricsHolder spoolMetrics;
     private final MemoryMetricsHolder memoryMetrics;
+    
+    /**
+     * Spooling is deprecated and shouldn't be used while implementing new 
features. As of HBase
+     * 0.98.17, we rely on pacing the server side scanners instead of pulling 
rows from the server
+     * and potentially spooling to a temporary file created on clients.
+     */
+    @Deprecated
     public static class SpoolingResultIteratorFactory implements 
ParallelIteratorFactory {
         private final QueryServices services;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index a86f899..a7e3068 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -32,14 +32,12 @@ import javax.annotation.concurrent.GuardedBy;
 import org.apache.hadoop.hbase.client.AbstractClientScanner;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.monitoring.CombinableMetric;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.Closeables;
-import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ServerUtil;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -68,8 +66,7 @@ public class TableResultIterator implements ResultIterator {
 
     @GuardedBy("this")
     private long renewLeaseTime = 0;
-    private PeekingResultIterator previousIterator;
-
+    
     @VisibleForTesting // Exposed for testing. DON'T USE ANYWHERE ELSE!
     TableResultIterator() {
         this.scanMetrics = null;
@@ -83,19 +80,13 @@ public class TableResultIterator implements ResultIterator {
     };
 
     public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics,
-                       long renewLeaseThreshold) throws SQLException {
-       this(mutationState,tableRef,scan,scanMetrics,renewLeaseThreshold,null);
-    }
-    
-    public TableResultIterator(MutationState mutationState, TableRef tableRef, 
Scan scan, CombinableMetric scanMetrics,
-            long renewLeaseThreshold, PeekingResultIterator previousIterator) 
throws SQLException {
+            long renewLeaseThreshold) throws SQLException {
         this.scan = scan;
         this.scanMetrics = scanMetrics;
         PTable table = tableRef.getTable();
         htable = mutationState.getHTable(table);
         this.scanIterator = UNINITIALIZED_SCANNER;
         this.renewLeaseThreshold = renewLeaseThreshold;
-        this.previousIterator = previousIterator;
     }
 
     @Override
@@ -127,15 +118,6 @@ public class TableResultIterator implements ResultIterator 
{
         ResultIterator delegate = this.scanIterator;
         if (delegate == UNINITIALIZED_SCANNER) {
             try {
-                if (previousIterator != null && 
scan.getAttribute(BaseScannerRegionObserver.SCAN_OFFSET) != null) {
-                    byte[] unusedOffset = 
QueryUtil.getUnusedOffset(previousIterator.peek());
-                    if (unusedOffset != null) {
-                        
scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, unusedOffset);
-                        previousIterator.next();
-                    } else {
-                        
scan.setAttribute(BaseScannerRegionObserver.SCAN_OFFSET, null);
-                    }
-                }
                 this.scanIterator =
                         new ScanningResultIterator(htable.getScanner(scan), 
scanMetrics);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
index b205da4..0e69bb5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixConnection.java
@@ -58,6 +58,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import co.cask.tephra.TransactionContext;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.htrace.Sampler;
@@ -77,6 +79,7 @@ import 
org.apache.phoenix.jdbc.PhoenixStatement.PhoenixStatementParser;
 import org.apache.phoenix.parse.PFunction;
 import org.apache.phoenix.parse.PSchema;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.ConnectionQueryServices.Feature;
 import org.apache.phoenix.query.DelegateConnectionQueryServices;
 import org.apache.phoenix.query.MetaDataMutated;
 import org.apache.phoenix.query.QueryConstants;
@@ -119,8 +122,6 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap.Builder;
 import com.google.common.collect.Lists;
 
-import co.cask.tephra.TransactionContext;
-
 
 /**
  * 
@@ -1043,8 +1044,8 @@ public class PhoenixConnection implements Connection, 
MetaDataMutated, SQLClosea
         this.parallelIteratorFactory = parallelIteratorFactory;
     }
     
-    public void addIterator(@Nonnull TableResultIterator itr) {
-        if 
(services.supportsFeature(ConnectionQueryServices.Feature.RENEW_LEASE)) {
+    public void addIteratorForLeaseRenewal(@Nonnull TableResultIterator itr) {
+        if (services.supportsFeature(Feature.RENEW_LEASE)) {
             checkNotNull(itr);
             scannerQueue.add(new WeakReference<TableResultIterator>(itr));
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 450ccfe..36aa8cd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -144,7 +144,7 @@ public interface QueryConstants {
     public final static String PHOENIX_METADATA = "table";
     public final static String OFFSET_ROW_KEY = "_OFFSET_";
     public final static byte[] OFFSET_ROW_KEY_BYTES = 
Bytes.toBytes(OFFSET_ROW_KEY);
-    public final static ImmutableBytesPtr offsetRowKeyPtr = new 
ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
+    public final static ImmutableBytesPtr OFFSET_ROW_KEY_PTR = new 
ImmutableBytesPtr(OFFSET_ROW_KEY_BYTES);
 
     public final static PName SINGLE_COLUMN_NAME = 
PNameFactory.newNormalizedName("s");
     public final static PName SINGLE_COLUMN_FAMILY_NAME = 
PNameFactory.newNormalizedName("s");

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index 665e77f..f4300fd 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -1226,6 +1226,7 @@ public class PTableImpl implements PTable {
         return isTransactional;
     }
 
+    @Override
     public int getBaseColumnCount() {
         return baseColumnCount;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6ff638e1/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index ded20e9..daab198 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -48,7 +48,9 @@ import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PInteger;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -369,13 +371,13 @@ public final class QueryUtil {
 
     }
 
-    public static byte[] getUnusedOffset(Tuple offsetTuple) {
+    public static Integer getRemainingOffset(Tuple offsetTuple) {
         if (offsetTuple != null) {
             ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr();
             offsetTuple.getKey(rowKeyPtr);
-            if (QueryConstants.offsetRowKeyPtr.compareTo(rowKeyPtr) == 0) {
-                Cell value = 
offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, 
QueryConstants.OFFSET_COLUMN);
-                return value.getValue();
+            if (QueryConstants.OFFSET_ROW_KEY_PTR.compareTo(rowKeyPtr) == 0) {
+                Cell cell = offsetTuple.getValue(QueryConstants.OFFSET_FAMILY, 
QueryConstants.OFFSET_COLUMN);
+                return PInteger.INSTANCE.toObject(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength(), PInteger.INSTANCE, SortOrder.ASC, 
null, null);
             }
         }
         return null;

Reply via email to