This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c777aa77d PHOENIX-7024 Fix issues in Server Paging (#1659)
3c777aa77d is described below

commit 3c777aa77d8d22a43c6aeb002b56bf72ee7d526e
Author: tkhurana <khurana.ta...@gmail.com>
AuthorDate: Tue Sep 12 10:03:33 2023 -0700

    PHOENIX-7024 Fix issues in Server Paging (#1659)
    
    * PHOENIX-7024 Fix issues in Server Paging
    
    ---------
    
    Co-authored-by: Tanuj Khurana <tkhur...@apache.org>
---
 .../org/apache/phoenix/end2end/BaseOrderByIT.java  |  20 +-
 .../org/apache/phoenix/end2end/ServerPagingIT.java | 336 +++++++++++++++++++++
 .../phoenix/iterate/PhoenixQueryTimeoutIT.java     |  12 +-
 .../phoenix/coprocessor/PagingRegionScanner.java   |  16 +-
 .../UncoveredGlobalIndexRegionScanner.java         |   3 +
 .../coprocessor/UncoveredIndexRegionScanner.java   |  30 +-
 .../UncoveredLocalIndexRegionScanner.java          |   3 +
 .../org/apache/phoenix/filter/PagingFilter.java    |  84 +++---
 .../phoenix/iterate/OffsetResultIterator.java      |  19 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  12 +-
 10 files changed, 456 insertions(+), 79 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
index a0ef9bdb72..dcd7463887 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java
@@ -44,16 +44,25 @@ import java.util.Properties;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.ExplainPlanAttributes;
 import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryBuilder;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
+import org.junit.Before;
 import org.junit.Test;
 
 
 public abstract class BaseOrderByIT extends ParallelStatsDisabledIT {
 
+    Properties props;
+    @Before
+    public void setup() {
+        props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, 
Long.toString(0));
+    }
+
     @Test
     public void testMultiOrderByExpr() throws Exception {
         String tenantId = getOrganizationId();
@@ -63,7 +72,7 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
                     Lists.newArrayList("ENTITY_ID", "B_STRING"))
             .setFullTableName(tableName)
             .setOrderByClause("B_STRING, ENTITY_ID");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             ResultSet rs = executeQuery(conn, queryBuilder);
             assertTrue (rs.next());
@@ -98,7 +107,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
                     Lists.newArrayList("ENTITY_ID", "B_STRING"))
             .setFullTableName(tableName)
             .setOrderByClause("B_STRING || ENTITY_ID DESC");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             ResultSet rs = executeQuery(conn, queryBuilder);
             assertTrue (rs.next());
@@ -126,7 +134,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testOrderByDifferentColumns() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
             String tableName = generateUniqueName();
@@ -201,7 +208,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testAggregateOrderBy() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String tableName = generateUniqueName();
         String ddl = "create table " + tableName + " (ID VARCHAR NOT NULL 
PRIMARY KEY, VAL1 VARCHAR, VAL2 INTEGER)";
@@ -262,7 +268,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testAggregateOptimizedOutOrderBy() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         Connection conn = DriverManager.getConnection(getUrl(), props);
         String tableName = generateUniqueName();
         String ddl = "create table " + tableName + " (K1 VARCHAR NOT NULL, K2 
VARCHAR NOT NULL, VAL1 VARCHAR, VAL2 INTEGER, CONSTRAINT pk PRIMARY 
KEY(K1,K2))";
@@ -342,7 +347,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testNullsLastWithDesc() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String tableName=generateUniqueName();
             String sql="CREATE TABLE "+tableName+" ( "+
@@ -606,7 +610,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
     }
 
     private void doTestOrderByReverseOptimization(boolean salted,boolean 
desc1,boolean desc2,boolean desc3) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String tableName=generateUniqueName();
             String sql="CREATE TABLE "+tableName+" ( "+
@@ -712,7 +715,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
     }
 
     private void doTestOrderByReverseOptimizationWithNullsLast(boolean 
salted,boolean desc1,boolean desc2,boolean desc3) throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String tableName=generateUniqueName();
             String sql="CREATE TABLE "+tableName+" ( "+
@@ -958,7 +960,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testOrderByNullable() throws SQLException {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             String sql = "CREATE TABLE IF NOT EXISTS us_population (state 
CHAR(2) NOT NULL," +
                     "city VARCHAR NOT NULL, population BIGINT CONSTRAINT my_pk 
PRIMARY KEY" +
@@ -1000,7 +1001,6 @@ public abstract class BaseOrderByIT extends 
ParallelStatsDisabledIT {
 
     @Test
     public void testPhoenix6999() throws Exception {
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         String tableName = "TBL_" + generateUniqueName();
         String descTableName = "TBL_" + generateUniqueName();
 
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
new file mode 100644
index 0000000000..55e0dd7b82
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.Date;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import static 
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan;
+import static 
org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class ServerPagingIT extends ParallelStatsDisabledIT {
+
+    @BeforeClass
+    public static synchronized void doSetup() throws Exception {
+        Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0));
+        setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
+    }
+    @Test
+    public void testOrderByNonAggregation() throws Exception {
+        final String tablename = generateUniqueName();
+        final String tenantId = getOrganizationId();
+
+        final Date D1 = DateUtil.parseDate("1970-01-01 00:58:00");
+        final Date D2 = DateUtil.parseDate("1970-01-01 01:02:00");
+        final Date D3 = DateUtil.parseDate("1970-01-01 01:30:00");
+        final Date D4 = DateUtil.parseDate("1970-01-01 01:45:00");
+        final Date D5 = DateUtil.parseDate("1970-01-01 02:00:00");
+        final Date D6 = DateUtil.parseDate("1970-01-01 04:00:00");
+        final String F1 = "A";
+        final String F2 = "B";
+        final String F3 = "C";
+        final String R1 = "R1";
+        final String R2 = "R2";
+        byte[][] splits = new byte[][] {
+                ByteUtil.concat(Bytes.toBytes(tenantId), 
PDate.INSTANCE.toBytes(D3)),
+                ByteUtil.concat(Bytes.toBytes(tenantId), 
PDate.INSTANCE.toBytes(D5)),
+        };
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String ddl = "create table " + tablename +
+                    "   (organization_id char(15) not null," +
+                    "    date date not null," +
+                    "    feature char(1) not null," +
+                    "    unique_users integer not null,\n" +
+                    "    transactions bigint,\n" +
+                    "    region varchar,\n" +
+                    "    CONSTRAINT pk PRIMARY KEY (organization_id, \"DATE\", 
feature, unique_users))";
+            StringBuilder buf = new StringBuilder(ddl);
+            if (splits != null) {
+                buf.append(" SPLIT ON (");
+                for (int i = 0; i < splits.length; i++) {
+                    
buf.append("'").append(Bytes.toString(splits[i])).append("'").append(",");
+                }
+                buf.setCharAt(buf.length()-1, ')');
+            }
+            ddl = buf.toString();
+            conn.createStatement().execute(ddl);
+
+            PreparedStatement stmt = conn.prepareStatement(
+                    "upsert into " + tablename +
+                            " (" +
+                            "    ORGANIZATION_ID, " +
+                            "    \"DATE\", " +
+                            "    FEATURE, " +
+                            "    UNIQUE_USERS, " +
+                            "    TRANSACTIONS, " +
+                            "    REGION) " +
+                            "VALUES (?, ?, ?, ?, ?, ?)");
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D1);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 10);
+            stmt.setLong(5, 100L);
+            stmt.setString(6, R2);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D2);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 20);
+            stmt.setLong(5, 200);
+            stmt.setString(6, null);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D3);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 30);
+            stmt.setLong(5, 300);
+            stmt.setString(6, R1);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D4);
+            stmt.setString(3, F2);
+            stmt.setInt(4, 40);
+            stmt.setLong(5, 400);
+            stmt.setString(6, R1);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D5);
+            stmt.setString(3, F3);
+            stmt.setInt(4, 50);
+            stmt.setLong(5, 500);
+            stmt.setString(6, R2);
+            stmt.execute();
+
+            stmt.setString(1, tenantId);
+            stmt.setDate(2, D6);
+            stmt.setString(3, F1);
+            stmt.setInt(4, 60);
+            stmt.setLong(5, 600);
+            stmt.setString(6, null);
+            stmt.execute();
+            conn.commit();
+        }
+
+        String query = "SELECT \"DATE\", transactions t FROM "+tablename+
+                " WHERE organization_id=? AND unique_users <= 30 ORDER BY t 
DESC LIMIT 2";
+        try (Connection conn = DriverManager.getConnection(getUrl(), props);
+            PreparedStatement statement = conn.prepareStatement(query)) {
+            statement.setString(1, tenantId);
+            try (ResultSet rs = statement.executeQuery()) {
+                assertTrue(rs.next());
+                assertEquals(D3.getTime(), rs.getDate(1).getTime());
+                assertTrue(rs.next());
+                assertEquals(D2.getTime(), rs.getDate(1).getTime());
+                assertFalse(rs.next());
+            }
+        }
+    }
+
+    @Test
+    public void testLimitOffset() throws SQLException {
+        final String tablename = generateUniqueName();
+        final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", 
"i", "j", "k", "l", "m", "n",
+                "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" };
+        String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT 
NULL,\n" + "k1 INTEGER NOT NULL,\n"
+                + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 
VARCHAR,\n"
+                + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + "SPLIT ON 
('e','i','o')";
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            createTestTable(getUrl(), ddl);
+            for (int i = 0; i < 26; i++) {
+                conn.createStatement().execute("UPSERT INTO " + tablename + " 
values('" + STRINGS[i] + "'," + i + ","
+                        + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + 
"')");
+            }
+            conn.commit();
+            int limit = 10;
+            // Testing 0 as remaining offset after 4 rows in first region, 4 
rows in second region
+            int offset = 8;
+            ResultSet rs;
+            rs = conn.createStatement()
+                    .executeQuery("SELECT t_id from " + tablename + " order by 
t_id limit " + limit + " offset " + offset);
+            int i = 0;
+            while (i < limit) {
+                assertTrue(rs.next());
+                assertEquals("Expected string didn't match for i = " + i, 
STRINGS[offset + i], rs.getString(1));
+                i++;
+            }
+
+            // Testing query with offset + filter
+            int filterCond = 10;
+            rs = conn.createStatement().executeQuery(
+                    "SELECT t_id from " + tablename + " where k2 > " + 
filterCond +
+                            " order by t_id limit " + limit + " offset " + 
offset);
+            i = 0;
+            limit = 5;
+            while (i < limit) {
+                assertTrue(rs.next());
+                assertEquals("Expected string didn't match for i = " + i,
+                        STRINGS[offset + filterCond + i], rs.getString(1));
+                i++;
+            }
+
+            limit = 35;
+            rs = conn.createStatement().executeQuery("SELECT t_id from " + 
tablename + " union all SELECT t_id from "
+                    + tablename + " offset " + offset + " FETCH FIRST " + 
limit + " rows only");
+            i = 0;
+            while (i++ < STRINGS.length - offset) {
+                assertTrue(rs.next());
+                assertEquals(STRINGS[offset + i - 1], rs.getString(1));
+            }
+            i = 0;
+            while (i++ < limit - STRINGS.length - offset) {
+                assertTrue(rs.next());
+                assertEquals(STRINGS[i - 1], rs.getString(1));
+            }
+            limit = 1;
+            offset = 1;
+            rs = conn.createStatement()
+                    .executeQuery("SELECT k2 from " + tablename + " order by 
k2 desc limit " + limit + " offset " + offset);
+            assertTrue(rs.next());
+            assertEquals(25, rs.getInt(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testGroupBy() throws SQLException {
+        final String tablename = generateUniqueName();
+        final String indexName = generateUniqueName();
+
+        String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT 
NULL,\n" + "k1 INTEGER NOT NULL,\n"
+                + "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) ";
+        String indexDDl = "CREATE INDEX IF NOT EXISTS " +  indexName + " ON " 
+ tablename + "(k2)";
+
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            createTestTable(getUrl(), ddl);
+            createTestTable(getUrl(), indexDDl);
+            for (int i = 0; i < 8; i++) {
+                conn.createStatement().execute("UPSERT INTO " + tablename + " 
values('tenant1'," + i + ","
+                        + (i + 1) + ")");
+            }
+            conn.commit();
+            ResultSet rs = conn.createStatement().executeQuery(
+                    "SELECT count(*) FROM " + tablename + " where t_id = 
'tenant1' AND (k2 IN (5,6) or k2 is null) group by k2=6");
+            while (rs.next()) {
+                Assert.assertEquals(1, rs.getInt(1));
+            }
+            Assert.assertFalse(rs.next());
+        }
+    }
+
+    @Test
+    public void testUncoveredQuery() throws Exception {
+        String dataTableName = generateUniqueName();
+        populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 
'abcd') and ('b', 'bc', 'bcd', 'bcde')
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE UNCOVERED INDEX "
+                    + indexTableName + " on " + dataTableName + " (val1) ");
+            String selectSql;
+            int limit = 10;
+            // Verify that an index hint is not necessary for an uncovered 
index
+            selectSql = "SELECT  val2, val3 from " + dataTableName +
+                    " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde') 
LIMIT " + limit;
+            assertExplainPlanWithLimit(conn, selectSql, dataTableName, 
indexTableName, limit);
+
+            ResultSet rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("bcd", rs.getString(1));
+            assertEquals("bcde", rs.getString(2));
+            assertFalse(rs.next());
+
+            // Add another row and run a group by query where the uncovered 
index should be used
+            conn.createStatement().execute("upsert into " + dataTableName + " 
(id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')");
+            conn.commit();
+
+            selectSql = "SELECT count(val3) from " + dataTableName + " where 
val1 > '0' GROUP BY val1";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            assertTrue(rs.next());
+            assertEquals(1, rs.getInt(1));
+            assertFalse(rs.next());
+
+            selectSql = "SELECT count(val3) from " + dataTableName + " where 
val1 > '0'";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals(3, rs.getInt(1));
+
+            // Run an order by query where the uncovered index should be used
+            selectSql = "SELECT val3 from " + dataTableName + " where val1 > 
'0' ORDER BY val1";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, selectSql, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("abcd", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("cdef", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("bcde", rs.getString(1));
+            assertFalse(rs.next());
+        }
+    }
+
+    private void populateTable(String tableName) throws Exception {
+        Connection conn = DriverManager.getConnection(getUrl());
+        conn.createStatement().execute("create table " + tableName +
+                " (id varchar(10) not null primary key, val1 varchar(10), val2 
varchar(10)," +
+                " val3 varchar(10))");
+        conn.createStatement().execute("upsert into " + tableName + " values 
('a', 'ab', 'abc', 'abcd')");
+        conn.commit();
+        conn.createStatement().execute("upsert into " + tableName + " values 
('b', 'bc', 'bcd', 'bcde')");
+        conn.commit();
+        conn.close();
+    }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
index 46a62d9c53..8d0a14fc56 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java
@@ -26,6 +26,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.SQLTimeoutException;
 import java.util.Properties;
 
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
@@ -119,7 +120,16 @@ public class PhoenixQueryTimeoutIT extends 
ParallelStatsDisabledIT {
             fail("Expected query to timeout with a 1 ms timeout");
         } catch (SQLException e) {
             //OPERATION_TIMED_OUT Exception expected
-            assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode());
+            Throwable t = e;
+            // SQLTimeoutException can be wrapped inside outer exceptions like 
PhoenixIOException
+            while (t != null && !(t instanceof SQLTimeoutException)) {
+                t = t.getCause();
+            }
+            if (t == null) {
+                fail("Expected query to fail with SQLTimeoutException");
+            }
+            assertEquals(OPERATION_TIMED_OUT.getErrorCode(),
+                    ((SQLTimeoutException)t).getErrorCode());
         } finally {
             
BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false);
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
index 090dee282c..5787630072 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagingRegionScanner.java
@@ -24,8 +24,11 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.filter.PagingFilter;
 import org.apache.phoenix.util.ScanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  *  PagingRegionScanner works with PagingFilter to make sure that the time 
between two rows
@@ -43,6 +46,9 @@ public class PagingRegionScanner extends BaseRegionScanner {
     private Region region;
     private Scan scan;
     private PagingFilter pagingFilter;
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PagingRegionScanner.class);
+
        public PagingRegionScanner(Region region, RegionScanner scanner, Scan 
scan) {
            super(scanner);
            this.region = region;
@@ -55,6 +61,9 @@ public class PagingRegionScanner extends BaseRegionScanner {
 
     private boolean next(List<Cell> results, boolean raw) throws IOException {
         try {
+            if (pagingFilter != null) {
+                pagingFilter.init();
+            }
             boolean hasMore = raw ? delegate.nextRaw(results) : 
delegate.next(results);
             if (pagingFilter == null) {
                 return hasMore;
@@ -66,19 +75,20 @@ public class PagingRegionScanner extends BaseRegionScanner {
                     // Close the current region scanner, start a new one and 
return a dummy result
                     delegate.close();
                     byte[] rowKey = pagingFilter.getRowKeyAtStop();
-                    scan.withStartRow(rowKey, true);
+                    boolean isInclusive = pagingFilter.isNextRowInclusive();
+                    scan.withStartRow(rowKey, isInclusive);
                     delegate = region.getScanner(scan);
                     if (results.isEmpty()) {
+                        LOGGER.info("Page filter stopped, generating dummy key 
{} inclusive={}",
+                                Bytes.toStringBinary(rowKey), isInclusive);
                         ScanUtil.getDummyResult(rowKey, results);
                     }
-                    pagingFilter.init();
                     return true;
                 }
                 return false;
             } else {
                 // We got a row from the HBase scanner within the configured 
time (i.e., the page size). We need to
                 // start a new page on the next next() call.
-                pagingFilter.resetStartTime();
                 return true;
             }
         } catch (Exception e) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index 13907af6ef..640febc45f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -131,6 +131,9 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
 
     protected void scanDataRows(Collection<byte[]> dataRowKeys, long 
startTime) throws IOException {
         Scan dataScan = prepareDataTableScan(dataRowKeys);
+        if (dataScan == null) {
+            return;
+        }
         try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
             for (Result result = resultScanner.next(); (result != null);
                  result = resultScanner.next()) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
index 8b1436ee10..46c61bb1b1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java
@@ -172,17 +172,27 @@ public abstract class UncoveredIndexRegionScanner extends 
BaseRegionScanner {
     protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws 
IOException {
         List<KeyRange> keys = new ArrayList<>(dataRowKeys.size());
         for (byte[] dataRowKey : dataRowKeys) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, 
SortOrder.ASC));
+            // If the data table scan was interrupted because of paging we 
retry the scan
+            // but on retry we should only fetch data table rows which we 
haven't already
+            // fetched.
+            if (!dataRows.containsKey(new ImmutableBytesPtr(dataRowKey))) {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, 
SortOrder.ASC));
+            }
+        }
+        if (!keys.isEmpty()) {
+            ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
+            Scan dataScan = new Scan(dataTableScan);
+            dataScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
+            scanRanges.initializeScan(dataScan);
+            SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
+            dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
+            
dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
+                    Bytes.toBytes(Long.valueOf(pageSizeMs)));
+            return dataScan;
+        } else {
+            LOGGER.info("All data rows have already been fetched");
+            return null;
         }
-        ScanRanges scanRanges = ScanRanges.createPointLookup(keys);
-        Scan dataScan = new Scan(dataTableScan);
-        dataScan.setTimeRange(scan.getTimeRange().getMin(), 
scan.getTimeRange().getMax());
-        scanRanges.initializeScan(dataScan);
-        SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
-        dataScan.setFilter(new SkipScanFilter(skipScanFilter, false));
-        dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS,
-                Bytes.toBytes(Long.valueOf(pageSizeMs)));
-        return dataScan;
     }
 
     protected boolean scanIndexTableRows(List<Cell> result,
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
index 0d6ddc39d0..d7273aa8a2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java
@@ -67,6 +67,9 @@ public class UncoveredLocalIndexRegionScanner extends 
UncoveredIndexRegionScanne
 
     protected void scanDataRows(Collection<byte[]> dataRowKeys, long 
startTime) throws IOException {
         Scan dataScan = prepareDataTableScan(dataRowKeys);
+        if (dataScan == null) {
+            return;
+        }
         try (RegionScanner regionScanner = region.getScanner(dataScan)) {
             boolean hasMore;
             do {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagingFilter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagingFilter.java
index e630e207c1..a472086d36 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagingFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagingFilter.java
@@ -22,7 +22,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
@@ -31,7 +30,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.Writable;
 import org.apache.phoenix.util.EnvironmentEdgeManager;
@@ -48,7 +46,11 @@ public class PagingFilter extends FilterBase implements 
Writable {
     State state;
     private long pageSizeMs;
     private long startTime;
-    private byte[] rowKeyAtStop;
+    // tracks the row which we will visit next. It is not always a full row 
key and maybe
+    // just the row key prefix.
+    private Cell nextHintCell;
+    // tracks the row we last visited
+    private Cell currentCell;
     private Filter delegate = null;
 
     public PagingFilter() {
@@ -70,10 +72,19 @@ public class PagingFilter extends FilterBase implements 
Writable {
     }
 
     public byte[] getRowKeyAtStop() {
-        if (rowKeyAtStop != null) {
-            return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length);
+        byte[] rowKeyAtStop = null;
+        if (nextHintCell != null) {
+            // if we have already seeked to the next cell use that when we 
resume the scan
+            rowKeyAtStop = CellUtil.cloneRow(nextHintCell);
+        } else if (currentCell != null) {
+            rowKeyAtStop = CellUtil.cloneRow(currentCell);
         }
-        return null;
+        return rowKeyAtStop;
+    }
+
+    public boolean isNextRowInclusive() {
+        // since this can be a key prefix we have to set inclusive to true 
when resuming scan
+        return nextHintCell != null;
     }
 
     public boolean isStopped() {
@@ -82,22 +93,19 @@ public class PagingFilter extends FilterBase implements 
Writable {
 
     public void init() {
         state = State.INITIAL;
-        rowKeyAtStop = null;
-    }
-
-    public void resetStartTime() {
-        if (state == State.STARTED) {
-            init();
-        }
+        currentCell = null;
+        nextHintCell = null;
     }
 
     @Override
     public void reset() throws IOException {
         long currentTime = EnvironmentEdgeManager.currentTimeMillis();
-        if (state == State.INITIAL) {
-            startTime = currentTime;
-            state = State.STARTED;
-        } else if (state == State.STARTED && currentTime - startTime >= 
pageSizeMs) {
+        // reset can be called multiple times for the same row sometimes even 
before we have
+        // scanned even one row. The order in which it is called is not very 
predictable.
+        // So we need to ensure that we have seen at least one row before we 
page.
+        // The currentCell != null check ensures that.
+        if (state == State.STARTED && currentCell != null
+                && currentTime - startTime >= pageSizeMs) {
             state = State.TIME_TO_STOP;
         }
         if (delegate != null) {
@@ -110,33 +118,19 @@ public class PagingFilter extends FilterBase implements 
Writable {
     @Override
     public Cell getNextCellHint(Cell currentKV) throws IOException {
         if (delegate != null) {
-            return delegate.getNextCellHint(currentKV);
+            Cell ret = delegate.getNextCellHint(currentKV);
+            // save the hint so that if the filter stops we know where to 
resume the scan
+            nextHintCell = ret;
+            return ret;
         }
         return super.getNextCellHint(currentKV);
     }
 
-    public boolean filterRowKey(byte[] buffer, int offset, int length) throws 
IOException {
-        if (state == State.TIME_TO_STOP) {
-            if (rowKeyAtStop == null) {
-                rowKeyAtStop = new byte[length];
-                Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length);
-            }
-            return true;
-        }
-        if (delegate != null) {
-            return delegate.filterRowKey(buffer, offset, length);
-        }
-        return super.filterRowKey(buffer, offset, length);
-    }
-
     @Override
     public boolean filterRowKey(Cell cell) throws IOException {
-        if (state == State.TIME_TO_STOP) {
-            if (rowKeyAtStop == null) {
-                rowKeyAtStop = CellUtil.cloneRow(cell);
-            }
-            return true;
-        }
+        currentCell = cell;
+        // now that we have visited the row we need to reset the hint
+        nextHintCell = null;
         if (delegate != null) {
             return delegate.filterRowKey(cell);
         }
@@ -145,10 +139,13 @@ public class PagingFilter extends FilterBase implements 
Writable {
 
     @Override
     public boolean filterAllRemaining() throws IOException {
-        if (state == State.TIME_TO_STOP && rowKeyAtStop != null) {
+        if (state == State.TIME_TO_STOP) {
             state = State.STOPPED;
             return true;
         }
+        if (state == State.STOPPED) {
+            return true;
+        }
         if (delegate != null) {
             return delegate.filterAllRemaining();
         }
@@ -156,7 +153,14 @@ public class PagingFilter extends FilterBase implements 
Writable {
     }
 
     @Override
+    /**
+     * This is called once for every row in the beginning.
+     */
     public boolean hasFilterRow() {
+        if (state == State.INITIAL) {
+            startTime = EnvironmentEdgeManager.currentTimeMillis();
+            state = State.STARTED;
+        }
         return true;
     }
 
@@ -223,6 +227,8 @@ public class PagingFilter extends FilterBase implements 
Writable {
 
     @Override
     public Filter.ReturnCode filterCell(Cell c) throws IOException {
+        currentCell = c;
+        nextHintCell = null;
         if (delegate != null) {
             return delegate.filterCell(c);
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
index 3eecfc82c3..e00a9fbeb4 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java
@@ -20,13 +20,10 @@ package org.apache.phoenix.iterate;
 import java.sql.SQLException;
 import java.util.List;
 
-import org.apache.phoenix.compile.ExplainPlanAttributes
-    .ExplainPlanAttributesBuilder;
-import org.apache.phoenix.schema.tuple.Tuple;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
+import static org.apache.phoenix.util.ScanUtil.isDummy;
 
-import static org.apache.phoenix.util.ScanUtil.getDummyResult;
-import static org.apache.phoenix.util.ScanUtil.getDummyTuple;
+import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
+import org.apache.phoenix.schema.tuple.Tuple;
 
 /**
  * Iterates through tuples up to a limit
@@ -49,14 +46,16 @@ public class OffsetResultIterator extends 
DelegateResultIterator {
     }
     @Override
     public Tuple next() throws SQLException {
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         while (rowCount < offset) {
             Tuple tuple = super.next();
             if (tuple == null) { return null; }
-            rowCount++;
-            if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= 
pageSizeMs) {
-                return getDummyTuple(tuple);
+            if (isDummy(tuple)) {
+                // while rowCount < offset absorb the dummy and call next on 
the underlying scanner
+                continue;
             }
+            rowCount++;
+            // no page timeout check at this level because we cannot correctly 
resume
+            // scans for OFFSET queries until the offset is reached
         }
         return super.next();
     }
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 6290f75bcd..d10fc7246c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -492,7 +492,7 @@ public class ScanUtil {
                 // But if last field of rowKey is variable length and also 
DESC, the trailing 0xFF
                 // is not removed when stored in HBASE, so for such case, we 
should not set
                 // lastInclusiveUpperSingleKey back to false.
-                if(sepByte != QueryConstants.DESC_SEPARATOR_BYTE) {
+                if (sepByte != QueryConstants.DESC_SEPARATOR_BYTE) {
                     lastInclusiveUpperSingleKey &= (fieldIndex + slotSpan[i]) 
< schema.getMaxFields()-1;
                 }
             }
@@ -694,10 +694,10 @@ public class ScanUtil {
     public static void setupLocalIndexScan(Scan scan) {
         byte[] prefix = scan.getStartRow().length == 0 ? new 
byte[scan.getStopRow().length]: scan.getStartRow();
         int prefixLength = scan.getStartRow().length == 0? 
scan.getStopRow().length: scan.getStartRow().length;
-        if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) {
+        if (scan.getAttribute(SCAN_START_ROW_SUFFIX) != null) {
             
scan.withStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX),
 0, prefix, prefixLength));
         }
-        if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) {
+        if (scan.getAttribute(SCAN_STOP_ROW_SUFFIX) != null) {
             
scan.withStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 
0, prefix, prefixLength));
         }
     }
@@ -719,7 +719,7 @@ public class ScanUtil {
      * @param newScan
      */
     public static void setLocalIndexAttributes(Scan newScan, int keyOffset, 
byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] 
stopRowSuffix) {
-        if(ScanUtil.isLocalIndex(newScan)) {
+        if (ScanUtil.isLocalIndex(newScan)) {
              newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey);
              newScan.withStartRow(regionStartKey);
              newScan.withStopRow(regionEndKey);
@@ -795,7 +795,7 @@ public class ScanUtil {
     public static int getRowKeyPosition(int[] slotSpan, int slotPosition) {
         int offset = 0;
 
-        for(int i = 0; i < slotPosition; i++) {
+        for (int i = 0; i < slotPosition; i++) {
             offset += slotSpan[i];
         }
 
@@ -1357,7 +1357,7 @@ public class ScanUtil {
 
     public static boolean isDummy(Tuple tuple) {
         if (tuple instanceof ResultTuple) {
-            isDummy(((ResultTuple) tuple).getResult());
+            return isDummy(((ResultTuple) tuple).getResult());
         }
         return false;
     }


Reply via email to