This is an automated email from the ASF dual-hosted git repository.

pboado pushed a commit to branch 5.x-cdh6
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit b6ce5c0c045afac00ce4af7e953516e74a8fe12f
Author: Abhishek Singh Chouhan <abhishekchouhan...@gmail.com>
AuthorDate: Thu Mar 14 00:34:37 2019 +0000

    PHOENIX-5131 Make spilling to disk for order/group by configurable
    
    Signed-off-by: Chinmay Kulkarni <chinmayskulka...@apache.org>
---
 .../java/org/apache/phoenix/end2end/OrderByIT.java |  45 +++++++
 ...OrderByWithServerClientSpoolingDisabledIT.java} |  17 ++-
 .../end2end/OrderByWithServerMemoryLimitIT.java    |  81 ++++++++++++
 .../phoenix/end2end/OrderByWithSpillingIT.java     |   3 +-
 .../phoenix/end2end/SpooledTmpFileDeleteIT.java    |   2 +-
 .../end2end/join/SortMergeJoinNoSpoolingIT.java    |  83 +++++++++++++
 .../phoenix/coprocessor/MetaDataProtocol.java      |   7 ++
 .../phoenix/coprocessor/ScanRegionObserver.java    |   4 +-
 .../org/apache/phoenix/execute/AggregatePlan.java  |  28 ++++-
 .../phoenix/execute/ClientAggregatePlan.java       |  30 ++++-
 .../org/apache/phoenix/execute/ClientScanPlan.java |  16 ++-
 .../java/org/apache/phoenix/execute/ScanPlan.java  |  10 +-
 .../apache/phoenix/execute/SortMergeJoinPlan.java  | 138 +++++----------------
 .../phoenix/hbase/index/util/VersionUtil.java      |  12 ++
 .../org/apache/phoenix/iterate/BufferedQueue.java  |  20 +--
 .../phoenix/iterate/BufferedSortedQueue.java       |  33 +----
 .../apache/phoenix/iterate/BufferedTupleQueue.java | 134 ++++++++++++++++++++
 .../iterate/NonAggregateRegionScannerFactory.java  |  45 +++++--
 .../iterate/OrderedAggregatingResultIterator.java  |   5 +-
 .../phoenix/iterate/OrderedResultIterator.java     |  72 +++++++++--
 .../org/apache/phoenix/iterate/PhoenixQueues.java  |  96 ++++++++++++++
 .../apache/phoenix/iterate/SizeAwareQueue.java}    |  19 +--
 .../org/apache/phoenix/iterate/SizeBoundQueue.java |  96 ++++++++++++++
 .../phoenix/iterate/SpoolingResultIterator.java    |   5 +-
 .../org/apache/phoenix/query/QueryServices.java    |  11 +-
 .../apache/phoenix/query/QueryServicesOptions.java |  19 ++-
 .../phoenix/iterate/OrderedResultIteratorTest.java |  55 +++++++-
 .../phoenix/query/QueryServicesTestImpl.java       |   3 +-
 .../org/apache/phoenix/util/MetaDataUtilTest.java  |  10 +-
 29 files changed, 880 insertions(+), 219 deletions(-)

diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
index 792d08f..172ed89 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByIT.java
@@ -20,7 +20,10 @@ package org.apache.phoenix.end2end;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.hamcrest.CoreMatchers.containsString;
 
 import java.sql.Connection;
 import java.sql.Date;
@@ -30,6 +33,8 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Properties;
 
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 
@@ -461,4 +466,44 @@ public class OrderByIT extends BaseOrderByIT {
             conn.close();
         }
     }
+
+    @Test
+    public void testOrderByWithClientMemoryLimit() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        props.put(QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+            Boolean.toString(Boolean.FALSE));
+
+        try(Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String tableName = generateUniqueName();
+            String ddl =
+                    "CREATE TABLE " + tableName + "  (a_string varchar not 
null, col1 integer"
+                            + "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+
+            String dml = "UPSERT INTO " + tableName + " VALUES(?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 20);
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 30);
+            stmt.execute();
+            conn.commit();
+
+            String query = "select count(*), col1 from " + tableName + " group 
by col1 order by 2";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            try {
+                rs.next();
+                fail("Expected PhoenixIOException due to 
IllegalStateException");
+            } catch (PhoenixIOException e) {
+                assertThat(e.getMessage(), 
containsString("java.lang.IllegalStateException: "
+                        + "Queue full. Consider increasing memory threshold or 
spooling to disk"));
+            }
+        }
+    }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java
similarity index 66%
copy from 
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
copy to 
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java
index c5eeaff..caf515f 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerClientSpoolingDisabledIT.java
@@ -19,18 +19,29 @@ package org.apache.phoenix.end2end;
 
 import java.util.Map;
 
+import org.apache.phoenix.iterate.SizeBoundQueue;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
 
 import com.google.common.collect.Maps;
 
-public class OrderByWithSpillingIT extends OrderByIT {
+/**
+ * Same as the order by test but with spooling disabled both on the server and 
client. This will use
+ * {@link SizeBoundQueue} for all its operations
+ */
+public class OrderByWithServerClientSpoolingDisabledIT extends OrderByIT {
+
     @BeforeClass
     public static void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-        // do lot's of spooling!
-        props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        // make sure disabling server side spooling has no affect on 
correctness(existing orderby
+        // IT)
+        props.put(QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+            Boolean.toString(Boolean.FALSE));
+        props.put(QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+            Boolean.toString(Boolean.FALSE));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
+
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java
new file mode 100644
index 0000000..2c66614
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithServerMemoryLimitIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class OrderByWithServerMemoryLimitIT extends BaseTest {
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        props.put(QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+            Boolean.toString(Boolean.FALSE));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testOrderByWithServerMemoryLimit() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            conn.setAutoCommit(false);
+            String tableName = generateUniqueName();
+            String ddl =
+                    "CREATE TABLE " + tableName + "  (a_string varchar not 
null, col1 integer"
+                            + "  CONSTRAINT pk PRIMARY KEY (a_string))\n";
+            createTestTable(getUrl(), ddl);
+
+            String dml = "UPSERT INTO " + tableName + " VALUES(?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(dml);
+            stmt.setString(1, "a");
+            stmt.setInt(2, 40);
+            stmt.execute();
+            stmt.setString(1, "b");
+            stmt.setInt(2, 20);
+            stmt.execute();
+            stmt.setString(1, "c");
+            stmt.setInt(2, 30);
+            stmt.execute();
+            conn.commit();
+
+            String query = "select * from " + tableName + " order by 2";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            try {
+                rs.next();
+                fail("Expected PhoenixIOException due to 
IllegalStateException");
+            } catch (PhoenixIOException e) {
+                assertThat(e.getMessage(), 
containsString("java.lang.IllegalStateException: "
+                        + "Queue full. Consider increasing memory threshold or 
spooling to disk"));
+            }
+        }
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
index c5eeaff..80a5123 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
@@ -30,7 +30,8 @@ public class OrderByWithSpillingIT extends OrderByIT {
     public static void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         // do lot's of spooling!
-        props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        props.put(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
index 9dc82bf..e63c3f6 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledTmpFileDeleteIT.java
@@ -44,7 +44,7 @@ public class SpooledTmpFileDeleteIT extends 
ParallelStatsDisabledIT {
     private Connection getConnection() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
         props.setProperty(QueryServices.SPOOL_DIRECTORY, spoolDir.getPath());
-        props.setProperty(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        props.setProperty(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
         props.setProperty(QueryServices.RENEW_LEASE_ENABLED, 
Boolean.toString(false));
         // Ensures round robin off so that spooling is used.
         // TODO: review with Samarth - should a Noop iterator be used if 
pacing is not possible?
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java
new file mode 100644
index 0000000..40b1fe6
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/SortMergeJoinNoSpoolingIT.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.phoenix.end2end.join;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.phoenix.exception.PhoenixIOException;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runners.Parameterized.Parameters;
+
+import com.google.common.collect.Maps;
+
+public class SortMergeJoinNoSpoolingIT extends SortMergeJoinNoIndexIT {
+
+    public SortMergeJoinNoSpoolingIT(String[] indexDDL, String[] plans) {
+        super(indexDDL, plans);
+    }
+
+    @Parameters(name = "SortMergeJoinNoSpoolingIT_{index}") // name is used by 
failsafe as file name
+                                                            // in reports
+    public static Collection<Object> data() {
+        return SortMergeJoinNoIndexIT.data();
+    }
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB,
+            Boolean.toString(Boolean.FALSE));
+        props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+            Integer.toString(10 * 1000 * 1000));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+
+    @Test
+    public void testJoinWithMemoryLimit() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.put(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String tableName1 = getTableName(conn, JOIN_ITEM_TABLE_FULL_NAME);
+            String tableName2 = getTableName(conn, 
JOIN_SUPPLIER_TABLE_FULL_NAME);
+            String query =
+                    "SELECT /*+ USE_SORT_MERGE_JOIN*/ item.\"item_id\", 
item.name, supp.\"supplier_id\", supp.name FROM "
+                            + tableName1 + " item JOIN " + tableName2
+                            + " supp ON item.\"supplier_id\" = 
supp.\"supplier_id\" ORDER BY \"item_id\"";
+
+            PreparedStatement statement = conn.prepareStatement(query);
+            ResultSet rs = statement.executeQuery();
+            try {
+                rs.next();
+                fail("Expected PhoenixIOException due to 
IllegalStateException");
+            } catch (PhoenixIOException e) {
+                assertThat(e.getMessage(), containsString(
+                    "Queue full. Consider increasing memory threshold or 
spooling to disk"));
+            }
+
+        }
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index 7a1d542..458ebe8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -118,6 +118,13 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     // Version at which we allow SYSTEM.CATALOG to split
     public static final int MIN_SPLITTABLE_SYSTEM_CATALOG = 
VersionUtil.encodeVersion("5", "1", "0");
 
+    // Version at and after which we will no longer expect client to serialize 
thresholdBytes for
+    // spooling into the scan
+    public static final int MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD =
+            VersionUtil.encodeVersion("5", "1", "0");
+    public static final int MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD =
+            VersionUtil.encodeVersion("4", "15", "0");
+
     // ALWAYS update this map whenever rolling out a new release (major, minor 
or patch release). 
     // Key is the SYSTEM.CATALOG timestamp for the version and value is the 
version string.
     private static final NavigableMap<Long, String> TIMESTAMP_VERSION_MAP = 
new TreeMap<>();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
index ae2a6fd..08fa321 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java
@@ -74,11 +74,11 @@ public class ScanRegionObserver extends 
BaseScannerRegionObserver implements Reg
       return Optional.of(this);
     }
 
-    public static void serializeIntoScan(Scan scan, int thresholdBytes, int 
limit, List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
+    public static void serializeIntoScan(Scan scan, int limit,
+            List<OrderByExpression> orderByExpressions, int estimatedRowSize) {
         ByteArrayOutputStream stream = new ByteArrayOutputStream(); // TODO: 
size?
         try {
             DataOutputStream output = new DataOutputStream(stream);
-            WritableUtils.writeVInt(output, thresholdBytes);
             WritableUtils.writeVInt(output, limit);
             WritableUtils.writeVInt(output, estimatedRowSize);
             WritableUtils.writeVInt(output, orderByExpressions.size());
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
index 0c8e8dc..d7c3048 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/AggregatePlan.java
@@ -193,8 +193,16 @@ public class AggregatePlan extends BaseQueryPlan {
                 isAscending=false;
             }
             OrderByExpression orderByExpression = new 
OrderByExpression(expression, isNullsLast, isAscending);
-            int threshold = 
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            return new OrderedResultIterator(scanner, 
Collections.<OrderByExpression>singletonList(orderByExpression), threshold);
+            long threshold =
+                    
services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+            boolean spoolingEnabled =
+                    services.getProps().getBoolean(
+                        QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+            return new OrderedResultIterator(scanner,
+                    Collections.<OrderByExpression> 
singletonList(orderByExpression),
+                    spoolingEnabled, threshold);
         }
     }
 
@@ -306,10 +314,18 @@ public class AggregatePlan extends BaseQueryPlan {
                 resultScanner = new LimitingResultIterator(resultScanner, 
limit);
             }
         } else {
-            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(),
-                    thresholdBytes, limit, offset);
+            long thresholdBytes =
+                    
context.getConnection().getQueryServices().getProps().getLong(
+                        QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+            boolean spoolingEnabled =
+                    
context.getConnection().getQueryServices().getProps().getBoolean(
+                        QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+            resultScanner =
+                    new OrderedAggregatingResultIterator(aggResultIterator,
+                            orderBy.getOrderByExpressions(), spoolingEnabled, 
thresholdBytes, limit,
+                            offset);
         }
         if (context.getSequenceManager().getSequenceCount() > 0) {
             resultScanner = new SequenceResultIterator(resultScanner, 
context.getSequenceManager());
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 60451a5..92bebf1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -145,8 +145,14 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
             if (groupBy.isOrderPreserving()) {
                 aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
             } else {
-                int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt
-                    (QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+                long thresholdBytes =
+                        
context.getConnection().getQueryServices().getProps().getLong(
+                            QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                            
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+                boolean spoolingEnabled =
+                        
context.getConnection().getQueryServices().getProps().getBoolean(
+                            
QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                            
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
                 List<OrderByExpression> keyExpressionOrderBy = 
Lists.newArrayListWithExpectedSize(keyExpressions.size());
                 for (Expression keyExpression : keyExpressions) {
                     keyExpressionOrderBy.add(new 
OrderByExpression(keyExpression, false, true));
@@ -156,7 +162,10 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
                     // Pass in orderBy to apply any sort that has been 
optimized away
                     aggResultIterator = new 
ClientHashAggregatingResultIterator(context, iterator, serverAggregators, 
keyExpressions, orderBy);
                 } else {
-                    iterator = new OrderedResultIterator(iterator, 
keyExpressionOrderBy, thresholdBytes, null, null, 
projector.getEstimatedRowByteSize());
+                    iterator =
+                            new OrderedResultIterator(iterator, 
keyExpressionOrderBy,
+                                    spoolingEnabled, thresholdBytes, null, 
null,
+                                    projector.getEstimatedRowByteSize());
                     aggResultIterator = new 
ClientGroupedAggregatingResultIterator(LookAheadResultIterator.wrap(iterator), 
serverAggregators, keyExpressions);
                 }
             }
@@ -180,9 +189,18 @@ public class ClientAggregatePlan extends 
ClientProcessingPlan {
                 resultScanner = new LimitingResultIterator(resultScanner, 
limit);
             }
         } else {
-            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            resultScanner = new 
OrderedAggregatingResultIterator(aggResultIterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit, offset);
+            long thresholdBytes =
+                    
context.getConnection().getQueryServices().getProps().getLong(
+                        QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+            boolean spoolingEnabled =
+                    
context.getConnection().getQueryServices().getProps().getBoolean(
+                        QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+            resultScanner =
+                    new OrderedAggregatingResultIterator(aggResultIterator,
+                            orderBy.getOrderByExpressions(), spoolingEnabled, 
thresholdBytes, limit,
+                            offset);
         }
         if (context.getSequenceManager().getSequenceCount() > 0) {
             resultScanner = new SequenceResultIterator(resultScanner, 
context.getSequenceManager());
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
index 3427f5f..4a54a41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientScanPlan.java
@@ -86,10 +86,18 @@ public class ClientScanPlan extends ClientProcessingPlan {
         }
         
         if (!orderBy.getOrderByExpressions().isEmpty()) { // TopN
-            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            iterator = new OrderedResultIterator(iterator, 
orderBy.getOrderByExpressions(), thresholdBytes, limit,
-                    offset, projector.getEstimatedRowByteSize());
+            long thresholdBytes =
+                    
context.getConnection().getQueryServices().getProps().getLong(
+                        QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+            boolean spoolingEnabled =
+                    
context.getConnection().getQueryServices().getProps().getBoolean(
+                        QueryServices.CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                        
QueryServicesOptions.DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED);
+            iterator =
+                    new OrderedResultIterator(iterator, 
orderBy.getOrderByExpressions(),
+                            spoolingEnabled, thresholdBytes, limit, offset,
+                            projector.getEstimatedRowByteSize());
         } else {
             if (offset != null) {
                 iterator = new OffsetResultIterator(iterator, offset);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
index cdb2da5..0ad0d1b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/ScanPlan.java
@@ -36,6 +36,7 @@ import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.coprocessor.ScanRegionObserver;
 import org.apache.phoenix.execute.visitor.ByteCountVisitor;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
@@ -110,11 +111,10 @@ public class ScanPlan extends BaseQueryPlan {
         this.allowPageFilter = allowPageFilter;
         boolean isOrdered = !orderBy.getOrderByExpressions().isEmpty();
         if (isOrdered) { // TopN
-            int thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                    QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
-            ScanRegionObserver.serializeIntoScan(context.getScan(), 
thresholdBytes,
-                    limit == null ? -1 : QueryUtil.getOffsetLimit(limit, 
offset), orderBy.getOrderByExpressions(),
-                    projector.getEstimatedRowByteSize());
+            ScanRegionObserver.serializeIntoScan(context.getScan(),
+                limit == null ? -1 : QueryUtil.getOffsetLimit(limit, offset),
+                orderBy.getOrderByExpressions(), 
projector.getEstimatedRowByteSize());
+            ScanUtil.setClientVersion(context.getScan(), 
MetaDataProtocol.PHOENIX_VERSION);
         }
         Integer perScanLimit = !allowPageFilter || isOrdered ? null : limit;
         perScanLimit = QueryUtil.getOffsetLimit(perScanLimit, offset);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
index e7966d9..c2686ac 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java
@@ -44,6 +44,7 @@ import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.compile.QueryPlan;
 import org.apache.phoenix.compile.RowProjector;
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.exception.PhoenixIOException;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.TupleProjector.ProjectedValueTuple;
@@ -53,7 +54,9 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
 import org.apache.phoenix.iterate.BufferedQueue;
 import org.apache.phoenix.iterate.ParallelScanGrouper;
+import org.apache.phoenix.iterate.PhoenixQueues;
 import org.apache.phoenix.iterate.ResultIterator;
+import org.apache.phoenix.iterate.SizeAwareQueue;
 import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
 import org.apache.phoenix.jdbc.PhoenixStatement.Operation;
 import org.apache.phoenix.optimize.Cost;
@@ -94,7 +97,8 @@ public class SortMergeJoinPlan implements QueryPlan {
     private final int rhsFieldPosition;
     private final boolean isSingleValueOnly;
     private final Set<TableRef> tableRefs;
-    private final int thresholdBytes;
+    private final long thresholdBytes;
+    private final boolean spoolingEnabled;
     private Long estimatedBytes;
     private Long estimatedRows;
     private Long estimateInfoTs;
@@ -120,8 +124,14 @@ public class SortMergeJoinPlan implements QueryPlan {
         this.tableRefs = 
Sets.newHashSetWithExpectedSize(lhsPlan.getSourceRefs().size() + 
rhsPlan.getSourceRefs().size());
         this.tableRefs.addAll(lhsPlan.getSourceRefs());
         this.tableRefs.addAll(rhsPlan.getSourceRefs());
-        this.thresholdBytes = 
context.getConnection().getQueryServices().getProps().getInt(
-                QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES);
+        this.thresholdBytes =
+                context.getConnection().getQueryServices().getProps().getLong(
+                    QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES);
+        this.spoolingEnabled =
+                
context.getConnection().getQueryServices().getProps().getBoolean(
+                    QueryServices.CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB,
+                    QueryServicesOptions.DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED);
     }
 
     @Override
@@ -294,7 +304,7 @@ public class SortMergeJoinPlan implements QueryPlan {
         private ValueBitSet lhsBitSet;
         private ValueBitSet rhsBitSet;
         private byte[] emptyProjectedValue;
-        private BufferedTupleQueue queue;
+        private SizeAwareQueue<Tuple> queue;
         private Iterator<Tuple> queueIterator;
         
         public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator 
rhsIterator) {
@@ -316,14 +326,23 @@ public class SortMergeJoinPlan implements QueryPlan {
             int len = lhsBitSet.getEstimatedLength();
             this.emptyProjectedValue = new byte[len];
             lhsBitSet.toBytes(emptyProjectedValue, 0);
-            this.queue = new BufferedTupleQueue(thresholdBytes);
+            this.queue = PhoenixQueues.newTupleQueue(spoolingEnabled, 
thresholdBytes);
             this.queueIterator = null;
         }
         
         @Override
         public void close() throws SQLException {
             SQLException e = closeIterators(lhsIterator, rhsIterator);
-            queue.close();
+            try {
+              queue.close();
+            } catch (IOException t) {
+              if (e != null) {
+                    e.setNextException(
+                        new SQLException("Also encountered exception while 
closing queue", t));
+              } else {
+                e = new SQLException("Error while closing queue",t);
+              }
+            }
             if (e != null) {
                 throw e;
             }
@@ -355,7 +374,11 @@ public class SortMergeJoinPlan implements QueryPlan {
                         if (lhsKey.equals(rhsKey)) {
                             next = join(lhsTuple, rhsTuple);
                              if (nextLhsTuple != null && 
lhsKey.equals(nextLhsKey)) {
-                                queue.offer(rhsTuple);
+                                try {
+                                    queue.add(rhsTuple);
+                                } catch (IllegalStateException e) {
+                                    throw new PhoenixIOException(e);
+                                }
                                 if (nextRhsTuple == null || 
!rhsKey.equals(nextRhsKey)) {
                                     queueIterator = queue.iterator();
                                     advance(true);
@@ -609,107 +632,6 @@ public class SortMergeJoinPlan implements QueryPlan {
         }
     }
     
-    private static class BufferedTupleQueue extends BufferedQueue<Tuple> {
-
-        public BufferedTupleQueue(int thresholdBytes) {
-            super(thresholdBytes);
-        }
-
-        @Override
-        protected BufferedSegmentQueue<Tuple> createSegmentQueue(
-                int index, int thresholdBytes) {
-            return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
-        }
-
-        @Override
-        protected Comparator<BufferedSegmentQueue<Tuple>> 
getSegmentQueueComparator() {
-            return new Comparator<BufferedSegmentQueue<Tuple>>() {
-                @Override
-                public int compare(BufferedSegmentQueue<Tuple> q1,
-                        BufferedSegmentQueue<Tuple> q2) {
-                    return q1.index() - q2.index();
-                }                
-            };
-        }
-
-        @Override
-        public Iterator<Tuple> iterator() {
-            return new Iterator<Tuple>() {
-                private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
-                private Iterator<Tuple> currentIter;
-                {
-                    this.queueIter = getSegmentQueues().iterator();
-                    this.currentIter = queueIter.hasNext() ? 
queueIter.next().iterator() : null;
-                }
-                
-                @Override
-                public boolean hasNext() {
-                    return currentIter != null && currentIter.hasNext();
-                }
-
-                @Override
-                public Tuple next() {
-                    if (!hasNext())
-                        return null;
-                    
-                    Tuple ret = currentIter.next();                    
-                    if (!currentIter.hasNext()) {
-                        this.currentIter = queueIter.hasNext() ? 
queueIter.next().iterator() : null;                       
-                    }
-                    
-                    return ret;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-                
-            };
-        }
-        
-        private static class BufferedTupleSegmentQueue extends 
BufferedSegmentQueue<Tuple> {
-            private LinkedList<Tuple> results;
-            
-            public BufferedTupleSegmentQueue(int index,
-                    int thresholdBytes, boolean hasMaxQueueSize) {
-                super(index, thresholdBytes, hasMaxQueueSize);
-                this.results = Lists.newLinkedList();
-            }
-
-            @Override
-            protected Queue<Tuple> getInMemoryQueue() {
-                return results;
-            }
-
-            @Override
-            protected int sizeOf(Tuple e) {
-                KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
-                return Bytes.SIZEOF_INT * 2 + kv.getLength();
-            }
-
-            @Override
-            protected void writeToStream(DataOutputStream out, Tuple e) throws 
IOException {
-                KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
-                out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
-                out.writeInt(kv.getLength());
-                out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
-            }
-
-            @Override
-            protected Tuple readFromStream(DataInputStream in) throws 
IOException {
-                int length = in.readInt();
-                if (length < 0)
-                    return null;
-                
-                byte[] b = new byte[length];
-                in.readFully(b);
-                Result result = ResultUtil.toResult(new 
ImmutableBytesWritable(b));
-                return new ResultTuple(result);
-            }
-            
-        }
-    }
     
     @Override
     public boolean useRoundRobinIterator() {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
index 42d07f5..fd02ab5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/VersionUtil.java
@@ -73,4 +73,16 @@ public class VersionUtil {
         version |= (major << Byte.SIZE * 2);
         return version;
     }
+
+    public static int decodeMajorVersion(int encodedVersion) {
+        return (encodedVersion >> Byte.SIZE * 2);
+    }
+
+    public static int decodeMinorVersion(int encodedVersion) {
+        return (encodedVersion >> Byte.SIZE) & 0xFF;
+    }
+
+    public static int decodePatchVersion(int encodedVersion) {
+        return encodedVersion & 0xFF;
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
index 6f6c523..1a646e6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java
@@ -36,14 +36,14 @@ import java.util.UUID;
 import com.google.common.collect.Lists;
 import com.google.common.collect.MinMaxPriorityQueue;
 
-public abstract class BufferedQueue<T> extends AbstractQueue<T> {
-    private final int thresholdBytes;
+public abstract class BufferedQueue<T> extends AbstractQueue<T> implements 
SizeAwareQueue<T> {
+    private final long thresholdBytes;
     private List<BufferedSegmentQueue<T>> queues;
     private int currentIndex;
     private BufferedSegmentQueue<T> currentQueue;
     private MinMaxPriorityQueue<BufferedSegmentQueue<T>> mergedQueue;
 
-    public BufferedQueue(int thresholdBytes) {
+    public BufferedQueue(long thresholdBytes) {
         this.thresholdBytes = thresholdBytes;
         this.queues = Lists.<BufferedSegmentQueue<T>> newArrayList();
         this.currentIndex = -1;
@@ -51,7 +51,7 @@ public abstract class BufferedQueue<T> extends 
AbstractQueue<T> {
         this.mergedQueue = null;
     }
     
-    abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, 
int thresholdBytes);
+    abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, 
long thresholdBytes);
     
     abstract protected Comparator<BufferedSegmentQueue<T>> 
getSegmentQueueComparator();
     
@@ -122,10 +122,12 @@ public abstract class BufferedQueue<T> extends 
AbstractQueue<T> {
         return size;
     }
     
+    @Override
     public long getByteSize() {
         return currentQueue == null ? 0 : currentQueue.getInMemByteSize();
     }
 
+    @Override
     public void close() {
         for (BufferedSegmentQueue<T> queue : queues) {
             queue.close();
@@ -150,10 +152,10 @@ public abstract class BufferedQueue<T> extends 
AbstractQueue<T> {
         protected static final int EOF = -1;
         
         private final int index;
-        private final int thresholdBytes;
+        private final long thresholdBytes;
         private final boolean hasMaxQueueSize;
         private long totalResultSize = 0;
-        private int maxResultSize = 0;
+        private long maxResultSize = 0;
         private File file;
         private boolean isClosed = false;
         private boolean flushBuffer = false;
@@ -163,7 +165,7 @@ public abstract class BufferedQueue<T> extends 
AbstractQueue<T> {
         // iterators to close on close()
         private List<SegmentQueueFileIterator> iterators;
 
-        public BufferedSegmentQueue(int index, int thresholdBytes, boolean 
hasMaxQueueSize) {
+        public BufferedSegmentQueue(int index, long thresholdBytes, boolean 
hasMaxQueueSize) {
             this.index = index;
             this.thresholdBytes = thresholdBytes;
             this.hasMaxQueueSize = hasMaxQueueSize;
@@ -171,7 +173,7 @@ public abstract class BufferedQueue<T> extends 
AbstractQueue<T> {
         }
         
         abstract protected Queue<T> getInMemoryQueue();
-        abstract protected int sizeOf(T e);
+        abstract protected long sizeOf(T e);
         abstract protected void writeToStream(DataOutputStream out, T e) 
throws IOException;
         abstract protected T readFromStream(DataInputStream in) throws 
IOException;
         
@@ -296,7 +298,7 @@ public abstract class BufferedQueue<T> extends 
AbstractQueue<T> {
 
         private void flush(T entry) throws IOException {
             Queue<T> inMemQueue = getInMemoryQueue();
-            int resultSize = sizeOf(entry);
+            long resultSize = sizeOf(entry);
             maxResultSize = Math.max(maxResultSize, resultSize);
             totalResultSize = hasMaxQueueSize ? maxResultSize * 
inMemQueue.size() : (totalResultSize + resultSize);
             if (totalResultSize >= thresholdBytes) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
index 700991f..80e20d9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java
@@ -42,7 +42,7 @@ public class BufferedSortedQueue extends 
BufferedQueue<ResultEntry> {
     private final int limit;
 
     public BufferedSortedQueue(Comparator<ResultEntry> comparator,
-            Integer limit, int thresholdBytes) throws IOException {
+            Integer limit, long thresholdBytes) throws IOException {
         super(thresholdBytes);
         this.comparator = comparator;
         this.limit = limit == null ? -1 : limit;
@@ -50,7 +50,7 @@ public class BufferedSortedQueue extends 
BufferedQueue<ResultEntry> {
 
     @Override
     protected BufferedSegmentQueue<ResultEntry> createSegmentQueue(
-            int index, int thresholdBytes) {
+            int index, long thresholdBytes) {
         return new BufferedResultEntryPriorityQueue(index, thresholdBytes, 
limit, comparator);
     }
 
@@ -68,7 +68,7 @@ public class BufferedSortedQueue extends 
BufferedQueue<ResultEntry> {
         private MinMaxPriorityQueue<ResultEntry> results = null;
         
         public BufferedResultEntryPriorityQueue(int index,
-                int thresholdBytes, int limit, Comparator<ResultEntry> 
comparator) {
+                long thresholdBytes, int limit, Comparator<ResultEntry> 
comparator) {
             super(index, thresholdBytes, limit >= 0);
             this.results = limit < 0 ? 
                     MinMaxPriorityQueue.<ResultEntry> 
orderedBy(comparator).create()
@@ -81,8 +81,8 @@ public class BufferedSortedQueue extends 
BufferedQueue<ResultEntry> {
         }
 
         @Override
-        protected int sizeOf(ResultEntry e) {
-            return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
+        protected long sizeOf(ResultEntry e) {
+            return ResultEntry.sizeOf(e);
         }
 
         @Override
@@ -147,28 +147,5 @@ public class BufferedSortedQueue extends 
BufferedQueue<ResultEntry> {
             return kvs;
         }
 
-        private int sizeof(List<KeyValue> kvs) {
-            int size = Bytes.SIZEOF_INT; // totalLen
-
-            for (KeyValue kv : kvs) {
-                size += kv.getLength();
-                size += Bytes.SIZEOF_INT; // kv.getLength
-            }
-
-            return size;
-        }
-
-        private int sizeof(ImmutableBytesWritable[] sortKeys) {
-            int size = Bytes.SIZEOF_INT;
-            if (sortKeys != null) {
-                for (ImmutableBytesWritable sortKey : sortKeys) {
-                    if (sortKey != null) {
-                        size += sortKey.getLength();
-                    }
-                    size += Bytes.SIZEOF_INT;
-                }
-            }
-            return size;
-        }
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java
new file mode 100644
index 0000000..7297a78
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedTupleQueue.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.tuple.ResultTuple;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+import org.apache.phoenix.util.ResultUtil;
+
+import com.google.common.collect.Lists;
+
+public class BufferedTupleQueue extends BufferedQueue<Tuple> {
+
+    public BufferedTupleQueue(long thresholdBytes) {
+        super(thresholdBytes);
+    }
+
+    @Override
+    protected BufferedSegmentQueue<Tuple> createSegmentQueue(int index, long 
thresholdBytes) {
+        return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
+    }
+
+    @Override
+    protected Comparator<BufferedSegmentQueue<Tuple>> 
getSegmentQueueComparator() {
+        return new Comparator<BufferedSegmentQueue<Tuple>>() {
+            @Override
+            public int compare(BufferedSegmentQueue<Tuple> q1, 
BufferedSegmentQueue<Tuple> q2) {
+                return q1.index() - q2.index();
+            }
+        };
+    }
+
+    @Override
+    public Iterator<Tuple> iterator() {
+        return new Iterator<Tuple>() {
+            private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
+            private Iterator<Tuple> currentIter;
+            {
+                this.queueIter = getSegmentQueues().iterator();
+                this.currentIter = queueIter.hasNext() ? 
queueIter.next().iterator() : null;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return currentIter != null && currentIter.hasNext();
+            }
+
+            @Override
+            public Tuple next() {
+                if (!hasNext()) return null;
+
+                Tuple ret = currentIter.next();
+                if (!currentIter.hasNext()) {
+                    this.currentIter = queueIter.hasNext() ? 
queueIter.next().iterator() : null;
+                }
+
+                return ret;
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+
+        };
+    }
+
+    private static class BufferedTupleSegmentQueue extends 
BufferedSegmentQueue<Tuple> {
+        private LinkedList<Tuple> results;
+
+        public BufferedTupleSegmentQueue(int index, long thresholdBytes, 
boolean hasMaxQueueSize) {
+            super(index, thresholdBytes, hasMaxQueueSize);
+            this.results = Lists.newLinkedList();
+        }
+
+        @Override
+        protected Queue<Tuple> getInMemoryQueue() {
+            return results;
+        }
+
+        @Override
+        protected long sizeOf(Tuple e) {
+            KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
+            return Bytes.SIZEOF_INT * 2 + kv.getLength();
+        }
+
+        @Override
+        protected void writeToStream(DataOutputStream out, Tuple e) throws 
IOException {
+            KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
+            out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
+            out.writeInt(kv.getLength());
+            out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
+        }
+
+        @Override
+        protected Tuple readFromStream(DataInputStream in) throws IOException {
+            int length = in.readInt();
+            if (length < 0) return null;
+
+            byte[] b = new byte[length];
+            in.readFully(b);
+            Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
+            return new ResultTuple(result);
+        }
+
+    }
+}
\ No newline at end of file
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
index 39bd4ca..4bbb8d6 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java
@@ -42,6 +42,7 @@ import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.HashJoinRegionScanner;
+import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -50,11 +51,14 @@ import 
org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.function.ArrayIndexFunction;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.ValueBitSet;
@@ -68,6 +72,7 @@ import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.ScanUtil;
 import org.apache.phoenix.util.ServerUtil;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -161,7 +166,15 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
               new RegionScannerResultIterator(innerScanner, 
getMinMaxQualifiersFromScan(scan), encodingScheme), scanOffset),
           scan.getAttribute(QueryConstants.LAST_SCAN) != null);
     }
-    final OrderedResultIterator iterator = deserializeFromScan(scan, 
innerScanner);
+    boolean spoolingEnabled =
+            env.getConfiguration().getBoolean(
+                QueryServices.SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB,
+                QueryServicesOptions.DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED);
+    long thresholdBytes =
+            
env.getConfiguration().getLong(QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                QueryServicesOptions.DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES);
+    final OrderedResultIterator iterator =
+            deserializeFromScan(scan, innerScanner, spoolingEnabled, 
thresholdBytes);
     if (iterator == null) {
       return innerScanner;
     }
@@ -169,15 +182,31 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
     return getTopNScanner(env, innerScanner, iterator, tenantId);
   }
 
-  private static OrderedResultIterator deserializeFromScan(Scan scan, 
RegionScanner s) {
+    @VisibleForTesting
+    static OrderedResultIterator deserializeFromScan(Scan scan, RegionScanner 
s,
+            boolean spoolingEnabled, long thresholdBytes) {
     byte[] topN = scan.getAttribute(BaseScannerRegionObserver.TOPN);
     if (topN == null) {
       return null;
-    }
-    ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: 
size?
-    try {
+        }
+        int clientVersion = ScanUtil.getClientVersion(scan);
+        // Client including and after 4.15 and 5.1 are not going to serialize 
thresholdBytes
+        // so we need to decode this only for older clients to not break wire 
compat
+        boolean shouldDecodeSpoolThreshold =
+                (scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION) 
== null)
+                        || (VersionUtil.decodeMajorVersion(clientVersion) > 5)
+                        || (VersionUtil.decodeMajorVersion(clientVersion) == 5
+                                && clientVersion < 
MetaDataProtocol.MIN_5_x_DISABLE_SERVER_SPOOL_THRESHOLD)
+                        || (VersionUtil.decodeMajorVersion(clientVersion) == 4
+                                && clientVersion < 
MetaDataProtocol.MIN_4_x_DISABLE_SERVER_SPOOL_THRESHOLD);
+        ByteArrayInputStream stream = new ByteArrayInputStream(topN); // TODO: 
size?
+        try {
       DataInputStream input = new DataInputStream(stream);
-      int thresholdBytes = WritableUtils.readVInt(input);
+      if (shouldDecodeSpoolThreshold) {
+        // Read off the scan but ignore, we won't honor client sent 
thresholdbytes, but the
+        // one set on server 
+        WritableUtils.readVInt(input);
+      }
       int limit = WritableUtils.readVInt(input);
       int estimatedRowSize = WritableUtils.readVInt(input);
       int size = WritableUtils.readVInt(input);
@@ -189,8 +218,8 @@ public class NonAggregateRegionScannerFactory extends 
RegionScannerFactory {
       }
       PTable.QualifierEncodingScheme encodingScheme = 
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
       ResultIterator inner = new RegionScannerResultIterator(s, 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan), encodingScheme);
-      return new OrderedResultIterator(inner, orderByExpressions, 
thresholdBytes, limit >= 0 ? limit : null, null,
-          estimatedRowSize);
+      return new OrderedResultIterator(inner, orderByExpressions, 
spoolingEnabled,
+              thresholdBytes, limit >= 0 ? limit : null, null, 
estimatedRowSize);
     } catch (IOException e) {
       throw new RuntimeException(e);
     } finally {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
index 51a7dd8..ef4b607 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedAggregatingResultIterator.java
@@ -36,9 +36,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
 public class OrderedAggregatingResultIterator extends OrderedResultIterator 
implements AggregatingResultIterator {
 
     public OrderedAggregatingResultIterator(AggregatingResultIterator delegate,
-            List<OrderByExpression> orderByExpressions, int thresholdBytes, 
Integer limit, Integer offset)
+            List<OrderByExpression> orderByExpressions, boolean 
spoolingEnabled, long thresholdBytes,
+            Integer limit, Integer offset)
                     throws SQLException {
-        super(delegate, orderByExpressions, thresholdBytes, limit, offset);
+        super(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, 
limit, offset);
     }
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
index 22712ff..ec9929e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java
@@ -22,16 +22,22 @@ import static 
com.google.common.base.Preconditions.checkPositionIndex;
 
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Queue;
 
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.execute.DescVarLengthFastByteComparisons;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
 import org.apache.phoenix.util.ServerUtil;
 import org.apache.phoenix.util.SizedUtil;
 
@@ -68,6 +74,44 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
         Tuple getResult() {
             return result;
         }
+
+        static long sizeOf(ResultEntry e) {
+          return sizeof(e.sortKeys) + sizeof(toKeyValues(e));
+        }
+
+        private static long sizeof(List<KeyValue> kvs) {
+          long size = Bytes.SIZEOF_INT; // totalLen
+
+          for (KeyValue kv : kvs) {
+              size += kv.getLength();
+              size += Bytes.SIZEOF_INT; // kv.getLength
+          }
+
+          return size;
+        }
+
+        private static long sizeof(ImmutableBytesWritable[] sortKeys) {
+            long size = Bytes.SIZEOF_INT;
+            if (sortKeys != null) {
+                for (ImmutableBytesWritable sortKey : sortKeys) {
+                    if (sortKey != null) {
+                        size += sortKey.getLength();
+                    }
+                    size += Bytes.SIZEOF_INT;
+                }
+            }
+            return size;
+        }
+
+        private static List<KeyValue> toKeyValues(ResultEntry entry) {
+          Tuple result = entry.getResult();
+          int size = result.size();
+          List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+          for (int i = 0; i < size; i++) {
+              kvs.add(PhoenixKeyValueUtil.maybeCopyCell(result.getValue(i)));
+          }
+          return kvs;
+        }
     }
     
     /** A function that returns Nth key for a given {@link ResultEntry}. */
@@ -91,7 +135,8 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
         }
     };
 
-    private final int thresholdBytes;
+    private final boolean spoolingEnabled;
+    private final long thresholdBytes;
     private final Integer limit;
     private final Integer offset;
     private final ResultIterator delegate;
@@ -106,20 +151,22 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
     }
     
     public OrderedResultIterator(ResultIterator delegate, 
List<OrderByExpression> orderByExpressions,
-            int thresholdBytes, Integer limit, Integer offset) {
-        this(delegate, orderByExpressions, thresholdBytes, limit, offset, 0);
+            boolean spoolingEnabled, long thresholdBytes, Integer limit, 
Integer offset) {
+        this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, 
limit, offset, 0);
     }
 
     public OrderedResultIterator(ResultIterator delegate, 
List<OrderByExpression> orderByExpressions,
-            int thresholdBytes) throws SQLException {
-        this(delegate, orderByExpressions, thresholdBytes, null, null);
+            boolean spoolingEnabled, long thresholdBytes) throws SQLException {
+        this(delegate, orderByExpressions, spoolingEnabled, thresholdBytes, 
null, null);
     }
 
-    public OrderedResultIterator(ResultIterator delegate, 
List<OrderByExpression> orderByExpressions, 
-            int thresholdBytes, Integer limit, Integer offset,int 
estimatedRowSize) {
+    public OrderedResultIterator(ResultIterator delegate,
+            List<OrderByExpression> orderByExpressions, boolean 
spoolingEnabled,
+            long thresholdBytes, Integer limit, Integer offset, int 
estimatedRowSize) {
         checkArgument(!orderByExpressions.isEmpty());
         this.delegate = delegate;
         this.orderByExpressions = orderByExpressions;
+        this.spoolingEnabled = spoolingEnabled;
         this.thresholdBytes = thresholdBytes;
         this.offset = offset == null ? 0 : offset;
         if (limit != null) {
@@ -208,8 +255,9 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
         List<Expression> expressions = 
Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION));
         final Comparator<ResultEntry> comparator = 
buildComparator(orderByExpressions);
         try{
-            final BufferedSortedQueue queueEntries = new 
BufferedSortedQueue(comparator, limit,
-                    thresholdBytes);
+            final SizeAwareQueue<ResultEntry> queueEntries =
+                    PhoenixQueues.newResultEntrySortedQueue(comparator, limit, 
spoolingEnabled,
+                        thresholdBytes);
             resultIterator = new PeekingResultIterator() {
                 int count = 0;
 
@@ -249,7 +297,11 @@ public class OrderedResultIterator implements 
PeekingResultIterator {
                 
                 @Override
                 public void close() throws SQLException {
-                    queueEntries.close();
+                    try {
+                      queueEntries.close();
+                    } catch (IOException e) {
+                      throw new SQLException(e);
+                    }
                 }
             };
             for (Tuple result = delegate.next(); result != null; result = 
delegate.next()) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java
new file mode 100644
index 0000000..1685f5e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/PhoenixQueues.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.LinkedList;
+
+import org.apache.curator.shaded.com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.util.PhoenixKeyValueUtil;
+
+import com.google.common.collect.MinMaxPriorityQueue;
+
+public class PhoenixQueues {
+
+    private PhoenixQueues() {
+    }
+
+    public static SizeAwareQueue<ResultEntry> 
newBufferedResultEntrySortedQueue(
+            Comparator<ResultEntry> comparator, Integer limit, long 
thresholdBytes)
+            throws IOException {
+        return new BufferedSortedQueue(comparator, limit, thresholdBytes);
+    }
+
+    public static SizeAwareQueue<Tuple> newBufferedTupleQueue(long 
thresholdBytes) {
+        return new BufferedTupleQueue(thresholdBytes);
+    }
+
+    public static SizeAwareQueue<ResultEntry> 
newSizeBoundResultEntrySortedQueue(
+            Comparator<ResultEntry> comparator, Integer limit, long 
maxSizeBytes) {
+        limit = limit == null ? -1 : limit;
+        MinMaxPriorityQueue<ResultEntry> queue =
+                limit < 0 ? MinMaxPriorityQueue.<ResultEntry> 
orderedBy(comparator).create()
+                        : MinMaxPriorityQueue.<ResultEntry> 
orderedBy(comparator).maximumSize(limit)
+                                .create();
+        return new SizeBoundQueue<ResultEntry>(maxSizeBytes, queue) {
+            @Override
+            public long 
sizeOf(org.apache.phoenix.iterate.OrderedResultIterator.ResultEntry e) {
+                return ResultEntry.sizeOf(e);
+            }
+
+        };
+    }
+
+    public static SizeAwareQueue<Tuple> newSizeBoundTupleQueue(long 
maxSizeBytes) {
+        LinkedList<Tuple> results = Lists.newLinkedList();
+        return new SizeBoundQueue<Tuple>(maxSizeBytes, results) {
+
+            @Override
+            public long sizeOf(Tuple e) {
+                KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
+                return Bytes.SIZEOF_INT * 2 + kv.getLength();
+            }
+
+        };
+    }
+
+    public static SizeAwareQueue<ResultEntry> newResultEntrySortedQueue(
+            Comparator<ResultEntry> comparator, Integer limit, boolean 
spoolingEnabled,
+            long thresholdBytes) throws IOException {
+        if (spoolingEnabled) {
+            return newBufferedResultEntrySortedQueue(comparator, limit, 
thresholdBytes);
+        } else {
+            return newSizeBoundResultEntrySortedQueue(comparator, limit, 
thresholdBytes);
+        }
+    }
+
+    public static SizeAwareQueue<Tuple> newTupleQueue(boolean spoolingEnabled,
+            long thresholdBytes) {
+        if (spoolingEnabled) {
+            return newBufferedTupleQueue(thresholdBytes);
+        } else {
+            return newSizeBoundTupleQueue(thresholdBytes);
+        }
+    }
+
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
 b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java
similarity index 56%
copy from 
phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
copy to 
phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java
index c5eeaff..73b3554 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeAwareQueue.java
@@ -15,22 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.phoenix.end2end;
+package org.apache.phoenix.iterate;
 
-import java.util.Map;
+import java.io.Closeable;
+import java.util.Queue;
 
-import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.util.ReadOnlyProps;
-import org.junit.BeforeClass;
+public interface SizeAwareQueue<T> extends Queue<T>, Closeable {
 
-import com.google.common.collect.Maps;
+    public long getByteSize();
 
-public class OrderByWithSpillingIT extends OrderByIT {
-    @BeforeClass
-    public static void doSetup() throws Exception {
-        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
-        // do lot's of spooling!
-        props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
Integer.toString(1));
-        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
-    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java
new file mode 100644
index 0000000..eb1e6be
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SizeBoundQueue.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.io.IOException;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+import java.util.Queue;
+
+public abstract class SizeBoundQueue<T> extends AbstractQueue<T> implements 
SizeAwareQueue<T> {
+
+    private long maxSizeBytes;
+    private Queue<T> delegate;
+    private long currentSize;
+
+    public SizeBoundQueue(long maxSizeBytes, Queue<T> delegate) {
+        assert maxSizeBytes > 0;
+        this.maxSizeBytes = maxSizeBytes;
+        this.delegate = delegate;
+    }
+
+    abstract public long sizeOf(T e);
+
+    @Override
+    public boolean offer(T e) {
+        boolean success = false;
+        long elementSize = sizeOf(e);
+        if ((currentSize + elementSize) < maxSizeBytes) {
+            success = delegate.offer(e);
+            if (success) {
+                currentSize += elementSize;
+            }
+        }
+        return success;
+    }
+
+    @Override
+    public boolean add(T e) {
+        try {
+            return super.add(e);
+        } catch (IllegalStateException ex) {
+            throw new IllegalStateException(
+                    "Queue full. Consider increasing memory threshold or 
spooling to disk", ex);
+        }
+    }
+
+    @Override
+    public T poll() {
+        T e = delegate.poll();
+        if (e != null) {
+            currentSize -= sizeOf(e);
+        }
+        return e;
+    }
+
+    @Override
+    public T peek() {
+        return delegate.peek();
+    }
+
+    @Override
+    public void close() throws IOException {
+        delegate.clear();
+    }
+
+    @Override
+    public long getByteSize() {
+        return currentSize;
+    }
+
+    @Override
+    public Iterator<T> iterator() {
+        return delegate.iterator();
+    }
+
+    @Override
+    public int size() {
+        return delegate.size();
+    }
+
+}
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 3136ca8..fa90b1a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -95,7 +95,8 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
 
     private SpoolingResultIterator(SpoolingMetricsHolder spoolMetrics, 
MemoryMetricsHolder memoryMetrics, ResultIterator scanner, QueryServices 
services) throws SQLException {
         this (spoolMetrics, memoryMetrics, scanner, 
services.getMemoryManager(),
-                
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
+                
services.getProps().getLong(QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB,
+                    QueryServicesOptions.DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES),
                 
services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
                 services.getProps().get(QueryServices.SPOOL_DIRECTORY, 
QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
     }
@@ -109,7 +110,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
     *  the memory manager) is exceeded.
     * @throws SQLException
     */
-    SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder 
mMetrics, ResultIterator scanner, MemoryManager mm, final int thresholdBytes, 
final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
+    SpoolingResultIterator(SpoolingMetricsHolder sMetrics, MemoryMetricsHolder 
mMetrics, ResultIterator scanner, MemoryManager mm, final long thresholdBytes, 
final long maxSpoolToDisk, final String spoolDirectory) throws SQLException {
         this.spoolMetrics = sMetrics;
         this.memoryMetrics = mMetrics;
         boolean success = false;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index e38f372..6742cb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -44,7 +44,16 @@ public interface QueryServices extends SQLCloseable {
     public static final String THREAD_POOL_SIZE_ATTRIB = 
"phoenix.query.threadPoolSize";
     public static final String QUEUE_SIZE_ATTRIB = "phoenix.query.queueSize";
     public static final String THREAD_TIMEOUT_MS_ATTRIB = 
"phoenix.query.timeoutMs";
-    public static final String SPOOL_THRESHOLD_BYTES_ATTRIB = 
"phoenix.query.spoolThresholdBytes";
+    public static final String SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB =
+            "phoenix.query.server.spoolThresholdBytes";
+    public static final String CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB =
+            "phoenix.query.client.spoolThresholdBytes";
+    public static final String CLIENT_ORDERBY_SPOOLING_ENABLED_ATTRIB =
+            "phoenix.query.client.orderBy.spooling.enabled";
+    public static final String CLIENT_JOIN_SPOOLING_ENABLED_ATTRIB =
+            "phoenix.query.client.join.spooling.enabled";
+    public static final String SERVER_ORDERBY_SPOOLING_ENABLED_ATTRIB =
+            "phoenix.query.server.orderBy.spooling.enabled";
     public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab";
     public static final String HBASE_CLIENT_PRINCIPAL = 
"hbase.myclient.principal";
     public static final String SPOOL_DIRECTORY = "phoenix.spool.directory";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index b6755d4..b7fc119 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -24,6 +24,7 @@ import static 
org.apache.phoenix.query.QueryServices.AUTO_UPGRADE_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_PRODUCER_ATTRIB_NAME;
 import static 
org.apache.phoenix.query.QueryServices.CALL_QUEUE_ROUND_ROBIN_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.CLIENT_METRICS_TAG;
+import static 
org.apache.phoenix.query.QueryServices.CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.COLLECT_REQUEST_LEVEL_METRICS;
 import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
 import static 
org.apache.phoenix.query.QueryServices.COST_BASED_OPTIMIZER_ENABLED;
@@ -79,10 +80,10 @@ import static 
org.apache.phoenix.query.QueryServices.RUN_RENEW_LEASE_FREQUENCY_I
 import static org.apache.phoenix.query.QueryServices.RUN_UPDATE_STATS_ASYNC;
 import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
+import static 
org.apache.phoenix.query.QueryServices.SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
-import static 
org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.STATS_COLLECTION_ENABLED;
 import static 
org.apache.phoenix.query.QueryServices.STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
@@ -134,6 +135,11 @@ public class QueryServicesOptions {
        public static final int DEFAULT_QUEUE_SIZE = 5000;
        public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
        public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 
20; // 20m
+       public static final int DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES = 1024 * 
1024 * 20; // 20m
+       public static final int DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES = 1024 * 
1024 * 20; // 20m
+       public static final boolean DEFAULT_CLIENT_ORDERBY_SPOOLING_ENABLED = 
true;
+       public static final boolean DEFAULT_CLIENT_JOIN_SPOOLING_ENABLED = true;
+       public static final boolean DEFAULT_SERVER_ORDERBY_SPOOLING_ENABLED = 
true;
     public static final String DEFAULT_SPOOL_DIRECTORY = 
System.getProperty("java.io.tmpdir");
        public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
        public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
@@ -408,7 +414,8 @@ public class QueryServicesOptions {
             .setIfUnset(THREAD_POOL_SIZE_ATTRIB, DEFAULT_THREAD_POOL_SIZE)
             .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
             .setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS)
-            .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, 
DEFAULT_SPOOL_THRESHOLD_BYTES)
+            .setIfUnset(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, 
DEFAULT_CLIENT_SPOOL_THRESHOLD_BYTES)
+            .setIfUnset(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, 
DEFAULT_SERVER_SPOOL_THRESHOLD_BYTES)
             .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
             .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
             .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, 
DEFAULT_MAX_TENANT_MEMORY_PERC)
@@ -522,8 +529,12 @@ public class QueryServicesOptions {
         return set(THREAD_TIMEOUT_MS_ATTRIB, threadTimeoutMs);
     }
 
-    public QueryServicesOptions setSpoolThresholdBytes(int 
spoolThresholdBytes) {
-        return set(SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
+    public QueryServicesOptions setClientSpoolThresholdBytes(long 
spoolThresholdBytes) {
+        return set(CLIENT_SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
+    }
+
+    public QueryServicesOptions setServerSpoolThresholdBytes(long 
spoolThresholdBytes) {
+        return set(SERVER_SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
     }
 
     public QueryServicesOptions setSpoolDirectory(String spoolDirectory) {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
index 50ed8e9..88bef47 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/OrderedResultIteratorTest.java
@@ -16,12 +16,23 @@
  */
 package org.apache.phoenix.iterate;
 
+import static org.junit.Assert.fail;
+
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.phoenix.coprocessor.ScanRegionObserver;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.OrderByExpression;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.util.ScanUtil;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Test class for {@link OrderedResultIterator}.
@@ -33,9 +44,47 @@ public class OrderedResultIteratorTest {
       ResultIterator delegate =  ResultIterator.EMPTY_ITERATOR;
       List<OrderByExpression> orderByExpressions = 
Collections.singletonList(null);
       int thresholdBytes = Integer.MAX_VALUE;
-      OrderedResultIterator iterator = new OrderedResultIterator(delegate, 
orderByExpressions, thresholdBytes);
-      // Should not throw an exception
+      boolean spoolingEnabled = true;
+      OrderedResultIterator iterator =
+              new OrderedResultIterator(delegate, orderByExpressions, 
spoolingEnabled,
+                      thresholdBytes);
+        // Should not throw an exception
       iterator.close();
-  }
+    }
+
+    @Test
+    public void testSpoolingBackwardCompatibility() {
+        RegionScanner s = Mockito.mock(RegionScanner.class);
+        Scan scan = new Scan();
+        Expression exp = LiteralExpression.newConstant(Boolean.TRUE);
+        OrderByExpression ex = new OrderByExpression(exp, false, false);
+        ScanRegionObserver.serializeIntoScan(scan, 0, Arrays.asList(ex), 100);
+        // Check 5.1.0 & Check > 5.1.0
+        ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.1.0"));
+        NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 
100);
+
+        ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.2.0"));
+        NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 
100);
+        // Check 4.15.0 Check > 4.15.0
+        ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.15.0"));
+        NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 
100);
+        ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.15.1"));
+        NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, false, 
100);
+
+        // Check < 5.1
+        ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("5.0.0"));
+        try {
+            NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, 
false, 100);
+            fail("Deserialize should fail for 5.0.0 since we didn't serialize 
thresholdBytes");
+        } catch (IllegalArgumentException e) {
+        }
+        // Check < 4.15
+        ScanUtil.setClientVersion(scan, VersionUtil.encodeVersion("4.14.0"));
+        try {
+            NonAggregateRegionScannerFactory.deserializeFromScan(scan, s, 
false, 100);
+            fail("Deserialize should fail for 4.14.0 since we didn't serialize 
thresholdBytes");
+        } catch (IllegalArgumentException e) {
+        }
 
+    }
 }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index 59e7fd3..df9d843 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -99,7 +99,8 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setThreadPoolSize(DEFAULT_THREAD_POOL_SIZE)
                 .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)
                 .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS)
-                .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
+                .setClientSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
+                .setServerSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
                 .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY)
                 .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
                 .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
index b16e401..7c8e021 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/MetaDataUtilTest.java
@@ -83,7 +83,15 @@ public class MetaDataUtilTest {
         
assertTrue(VersionUtil.encodeVersion("0.94.1-mapR")>VersionUtil.encodeVersion("0.94"));
         assertTrue(VersionUtil.encodeVersion("1", "1", 
"3")>VersionUtil.encodeVersion("1", "1", "1"));
     }
-    
+
+    @Test
+    public void testDecode() {
+        int encodedVersion = VersionUtil.encodeVersion("4.15.5");
+        assertEquals(VersionUtil.decodeMajorVersion(encodedVersion), 4);
+        assertEquals(VersionUtil.decodeMinorVersion(encodedVersion), 15);
+        assertEquals(VersionUtil.decodePatchVersion(encodedVersion), 5);
+    }
+
     @Test
     public void testCompatibility() {
         
assertTrue(MetaDataUtil.areClientAndServerCompatible(VersionUtil.encodeVersion(1,2,1),
 1, 2));

Reply via email to