This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch 5.1 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.1 by this push: new 0120f95489 Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746) 0120f95489 is described below commit 0120f954891741a3473e8e6cd720ff7042e91ec7 Author: tkhurana <khurana.ta...@gmail.com> AuthorDate: Thu Nov 30 10:53:54 2023 -0800 Backport PHOENIX-7024 and PHOENIX-6909 to 5.1 (#1746) * PHOENIX-7024 Fix issues in Server Paging * PHOENIX-6909 Paged rows counter metric --- .../org/apache/phoenix/end2end/BaseOrderByIT.java | 20 +- .../org/apache/phoenix/end2end/ServerPagingIT.java | 369 +++++++++++++++++++++ .../phoenix/iterate/PhoenixQueryTimeoutIT.java | 14 +- .../phoenix/coprocessor/PagedRegionScanner.java | 19 +- .../UncoveredGlobalIndexRegionScanner.java | 3 + .../coprocessor/UncoveredIndexRegionScanner.java | 30 +- .../UncoveredLocalIndexRegionScanner.java | 3 + .../org/apache/phoenix/filter/PagedFilter.java | 84 ++--- .../phoenix/iterate/OffsetResultIterator.java | 19 +- .../phoenix/iterate/ScanningResultIterator.java | 14 +- .../phoenix/monitoring/GlobalClientMetrics.java | 4 +- .../org/apache/phoenix/monitoring/MetricType.java | 4 +- .../phoenix/monitoring/ScanMetricsHolder.java | 8 + .../java/org/apache/phoenix/util/ScanUtil.java | 12 +- 14 files changed, 515 insertions(+), 88 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java index a0ef9bdb72..dcd7463887 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/BaseOrderByIT.java @@ -44,16 +44,25 @@ import java.util.Properties; import org.apache.phoenix.compile.ExplainPlan; import org.apache.phoenix.compile.ExplainPlanAttributes; import org.apache.phoenix.jdbc.PhoenixPreparedStatement; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.QueryBuilder; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; +import org.junit.Before; import org.junit.Test; public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { + Properties props; + @Before + public void setup() { + props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + props.setProperty(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0)); + } + @Test public void testMultiOrderByExpr() throws Exception { String tenantId = getOrganizationId(); @@ -63,7 +72,7 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { Lists.newArrayList("ENTITY_ID", "B_STRING")) .setFullTableName(tableName) .setOrderByClause("B_STRING, ENTITY_ID"); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { ResultSet rs = executeQuery(conn, queryBuilder); assertTrue (rs.next()); @@ -98,7 +107,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { Lists.newArrayList("ENTITY_ID", "B_STRING")) .setFullTableName(tableName) .setOrderByClause("B_STRING || ENTITY_ID DESC"); - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { ResultSet rs = executeQuery(conn, queryBuilder); assertTrue (rs.next()); @@ -126,7 +134,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { @Test public void testOrderByDifferentColumns() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); String tableName = generateUniqueName(); @@ -201,7 +208,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { @Test public void testAggregateOrderBy() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = generateUniqueName(); String ddl = "create table " + tableName + " (ID VARCHAR NOT NULL PRIMARY KEY, VAL1 VARCHAR, VAL2 INTEGER)"; @@ -262,7 +268,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { @Test public void testAggregateOptimizedOutOrderBy() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); Connection conn = DriverManager.getConnection(getUrl(), props); String tableName = generateUniqueName(); String ddl = "create table " + tableName + " (K1 VARCHAR NOT NULL, K2 VARCHAR NOT NULL, VAL1 VARCHAR, VAL2 INTEGER, CONSTRAINT pk PRIMARY KEY(K1,K2))"; @@ -342,7 +347,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { @Test public void testNullsLastWithDesc() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String tableName=generateUniqueName(); String sql="CREATE TABLE "+tableName+" ( "+ @@ -606,7 +610,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { } private void doTestOrderByReverseOptimization(boolean salted,boolean desc1,boolean desc2,boolean desc3) throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String tableName=generateUniqueName(); String sql="CREATE TABLE "+tableName+" ( "+ @@ -712,7 +715,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { } private void doTestOrderByReverseOptimizationWithNullsLast(boolean salted,boolean desc1,boolean desc2,boolean desc3) throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String tableName=generateUniqueName(); String sql="CREATE TABLE "+tableName+" ( "+ @@ -958,7 +960,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { @Test public void testOrderByNullable() throws SQLException { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String sql = "CREATE TABLE IF NOT EXISTS us_population (state CHAR(2) NOT NULL," + "city VARCHAR NOT NULL, population BIGINT CONSTRAINT my_pk PRIMARY KEY" + @@ -1000,7 +1001,6 @@ public abstract class BaseOrderByIT extends ParallelStatsDisabledIT { @Test public void testPhoenix6999() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); String tableName = "TBL_" + generateUniqueName(); String descTableName = "TBL_" + generateUniqueName(); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java new file mode 100644 index 0000000000..d4368ee1d7 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlan; +import static org.apache.phoenix.end2end.index.GlobalIndexCheckerIT.assertExplainPlanWithLimit; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.sql.Connection; +import java.sql.Date; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.monitoring.MetricType; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; +import org.apache.phoenix.util.ByteUtil; +import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(NeedsOwnMiniClusterTest.class) +public class ServerPagingIT extends ParallelStatsDisabledIT { + + @BeforeClass + public static synchronized void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(2); + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(0)); + props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, String.valueOf(true)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + + private void assertServerPagingMetric(String tableName, ResultSet rs, boolean isPaged) throws SQLException { + Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) { + assertEquals(String.format("Got %s", entry.getKey()), tableName, entry.getKey()); + Map<MetricType, Long> metricValues = entry.getValue(); + Long pagedRowsCntr = metricValues.get(MetricType.PAGED_ROWS_COUNTER); + assertNotNull(pagedRowsCntr); + if (isPaged) { + assertTrue(String.format("Got %d", pagedRowsCntr.longValue()), pagedRowsCntr > 0); + } else { + assertTrue(String.format("Got %d", pagedRowsCntr.longValue()), pagedRowsCntr == 0); + } + } + assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0); + } + + @Test + public void testOrderByNonAggregation() throws Exception { + final String tablename = generateUniqueName(); + final String tenantId = getOrganizationId(); + + final Date D1 = DateUtil.parseDate("1970-01-01 00:58:00"); + final Date D2 = DateUtil.parseDate("1970-01-01 01:02:00"); + final Date D3 = DateUtil.parseDate("1970-01-01 01:30:00"); + final Date D4 = DateUtil.parseDate("1970-01-01 01:45:00"); + final Date D5 = DateUtil.parseDate("1970-01-01 02:00:00"); + final Date D6 = DateUtil.parseDate("1970-01-01 04:00:00"); + final String F1 = "A"; + final String F2 = "B"; + final String F3 = "C"; + final String R1 = "R1"; + final String R2 = "R2"; + byte[][] splits = new byte[][] { + ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D3)), + ByteUtil.concat(Bytes.toBytes(tenantId), PDate.INSTANCE.toBytes(D5)), + }; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + String ddl = "create table " + tablename + + " (organization_id char(15) not null," + + " date date not null," + + " feature char(1) not null," + + " unique_users integer not null,\n" + + " transactions bigint,\n" + + " region varchar,\n" + + " CONSTRAINT pk PRIMARY KEY (organization_id, \"DATE\", feature, unique_users))"; + StringBuilder buf = new StringBuilder(ddl); + if (splits != null) { + buf.append(" SPLIT ON ("); + for (int i = 0; i < splits.length; i++) { + buf.append("'").append(Bytes.toString(splits[i])).append("'").append(","); + } + buf.setCharAt(buf.length()-1, ')'); + } + ddl = buf.toString(); + conn.createStatement().execute(ddl); + + PreparedStatement stmt = conn.prepareStatement( + "upsert into " + tablename + + " (" + + " ORGANIZATION_ID, " + + " \"DATE\", " + + " FEATURE, " + + " UNIQUE_USERS, " + + " TRANSACTIONS, " + + " REGION) " + + "VALUES (?, ?, ?, ?, ?, ?)"); + stmt.setString(1, tenantId); + stmt.setDate(2, D1); + stmt.setString(3, F1); + stmt.setInt(4, 10); + stmt.setLong(5, 100L); + stmt.setString(6, R2); + stmt.execute(); + + stmt.setString(1, tenantId); + stmt.setDate(2, D2); + stmt.setString(3, F1); + stmt.setInt(4, 20); + stmt.setLong(5, 200); + stmt.setString(6, null); + stmt.execute(); + + stmt.setString(1, tenantId); + stmt.setDate(2, D3); + stmt.setString(3, F1); + stmt.setInt(4, 30); + stmt.setLong(5, 300); + stmt.setString(6, R1); + stmt.execute(); + + stmt.setString(1, tenantId); + stmt.setDate(2, D4); + stmt.setString(3, F2); + stmt.setInt(4, 40); + stmt.setLong(5, 400); + stmt.setString(6, R1); + stmt.execute(); + + stmt.setString(1, tenantId); + stmt.setDate(2, D5); + stmt.setString(3, F3); + stmt.setInt(4, 50); + stmt.setLong(5, 500); + stmt.setString(6, R2); + stmt.execute(); + + stmt.setString(1, tenantId); + stmt.setDate(2, D6); + stmt.setString(3, F1); + stmt.setInt(4, 60); + stmt.setLong(5, 600); + stmt.setString(6, null); + stmt.execute(); + conn.commit(); + } + + String query = "SELECT \"DATE\", transactions t FROM "+tablename+ + " WHERE organization_id=? AND unique_users <= 30 ORDER BY t DESC LIMIT 2"; + try (Connection conn = DriverManager.getConnection(getUrl(), props); + PreparedStatement statement = conn.prepareStatement(query)) { + statement.setString(1, tenantId); + try (ResultSet rs = statement.executeQuery()) { + assertTrue(rs.next()); + assertEquals(D3.getTime(), rs.getDate(1).getTime()); + assertTrue(rs.next()); + assertEquals(D2.getTime(), rs.getDate(1).getTime()); + assertFalse(rs.next()); + assertServerPagingMetric(tablename, rs, true); + } + } + } + + @Test + public void testLimitOffset() throws SQLException { + final String tablename = generateUniqueName(); + final String[] STRINGS = { "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", + "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z" }; + String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "C3.k3 INTEGER,\n" + "C2.v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (t_id, k1, k2)) " + "SPLIT ON ('e','i','o')"; + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + createTestTable(getUrl(), ddl); + for (int i = 0; i < 26; i++) { + conn.createStatement().execute("UPSERT INTO " + tablename + " values('" + STRINGS[i] + "'," + i + "," + + (i + 1) + "," + (i + 2) + ",'" + STRINGS[25 - i] + "')"); + } + conn.commit(); + int limit = 10; + // Testing 0 as remaining offset after 4 rows in first region, 4 rows in second region + int offset = 8; + ResultSet rs; + rs = conn.createStatement() + .executeQuery("SELECT t_id from " + tablename + " order by t_id limit " + limit + " offset " + offset); + int i = 0; + while (i < limit) { + assertTrue(rs.next()); + assertEquals("Expected string didn't match for i = " + i, STRINGS[offset + i], rs.getString(1)); + i++; + } + // no paging when serial offset + assertServerPagingMetric(tablename, rs, false); + + // Testing query with offset + filter + int filterCond = 10; + rs = conn.createStatement().executeQuery( + "SELECT t_id from " + tablename + " where k2 > " + filterCond + + " order by t_id limit " + limit + " offset " + offset); + i = 0; + limit = 5; + while (i < limit) { + assertTrue(rs.next()); + assertEquals("Expected string didn't match for i = " + i, + STRINGS[offset + filterCond + i], rs.getString(1)); + i++; + } + // no paging when serial offset + assertServerPagingMetric(tablename, rs, false); + + limit = 35; + rs = conn.createStatement().executeQuery("SELECT t_id from " + tablename + " union all SELECT t_id from " + + tablename + " offset " + offset + " FETCH FIRST " + limit + " rows only"); + i = 0; + while (i++ < STRINGS.length - offset) { + assertTrue(rs.next()); + assertEquals(STRINGS[offset + i - 1], rs.getString(1)); + } + i = 0; + while (i++ < limit - STRINGS.length - offset) { + assertTrue(rs.next()); + assertEquals(STRINGS[i - 1], rs.getString(1)); + } + // no paging when serial offset + assertServerPagingMetric(tablename, rs, false); + limit = 1; + offset = 1; + rs = conn.createStatement() + .executeQuery("SELECT k2 from " + tablename + " order by k2 desc limit " + limit + " offset " + offset); + assertTrue(rs.next()); + assertEquals(25, rs.getInt(1)); + assertFalse(rs.next()); + // because of descending order the offset is implemented on client + // so this generates a parallel scan and paging happens + assertServerPagingMetric(tablename, rs, true); + } + } + + @Test + public void testGroupBy() throws SQLException { + final String tablename = generateUniqueName(); + final String indexName = generateUniqueName(); + + String ddl = "CREATE TABLE " + tablename + " (t_id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER CONSTRAINT pk PRIMARY KEY (t_id, k1)) "; + String indexDDl = "CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tablename + "(k2)"; + + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + createTestTable(getUrl(), ddl); + createTestTable(getUrl(), indexDDl); + for (int i = 0; i < 8; i++) { + conn.createStatement().execute("UPSERT INTO " + tablename + " values('tenant1'," + i + "," + + (i + 1) + ")"); + } + conn.commit(); + ResultSet rs = conn.createStatement().executeQuery( + "SELECT count(*) FROM " + tablename + " where t_id = 'tenant1' AND (k2 IN (5,6) or k2 is null) group by k2=6"); + while (rs.next()) { + Assert.assertEquals(1, rs.getInt(1)); + } + Assert.assertFalse(rs.next()); + assertServerPagingMetric(indexName, rs, true); + } + } + + @Test + public void testUncoveredQuery() throws Exception { + String dataTableName = generateUniqueName(); + populateTable(dataTableName); // with two rows ('a', 'ab', 'abc', 'abcd') and ('b', 'bc', 'bcd', 'bcde') + try (Connection conn = DriverManager.getConnection(getUrl())) { + String indexTableName = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + + indexTableName + " on " + dataTableName + " (val1) include (val2) "); + String selectSql; + int limit = 10; + //Verify that with index hint, we will read from the index table even though val3 is not included by the index table + selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ val3,val2 from " + + dataTableName + " WHERE val1 = 'bc' AND (val2 = 'bcd' OR val3 ='bcde') LIMIT " + limit; + assertExplainPlanWithLimit(conn, selectSql, dataTableName, indexTableName, limit); + ResultSet rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("bcde", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertFalse(rs.next()); + assertServerPagingMetric(indexTableName, rs, true); + + // Add another row and run a group by query where the uncovered index should be used + conn.createStatement().execute("upsert into " + dataTableName + " (id, val1, val2, val3) values ('c', 'ab','cde', 'cdef')"); + conn.commit(); + + selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0' GROUP BY val1"; + // Verify that we will read from the index table + assertExplainPlan(conn, selectSql, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertFalse(rs.next()); + + selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ count(val3) from " + dataTableName + " where val1 > '0'"; + // Verify that we will read from the index table + assertExplainPlan(conn, selectSql, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + + // Run an order by query where the uncovered index should be used + selectSql = "SELECT /*+ INDEX(" + dataTableName + " " + indexTableName + "*/ val3 from " + dataTableName + " where val1 > '0' ORDER BY val1"; + // Verify that we will read from the index table + assertExplainPlan(conn, selectSql, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(selectSql); + assertTrue(rs.next()); + assertEquals("abcd", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("cdef", rs.getString(1)); + assertTrue(rs.next()); + assertEquals("bcde", rs.getString(1)); + assertFalse(rs.next()); + } + } + + private void populateTable(String tableName) throws Exception { + Connection conn = DriverManager.getConnection(getUrl()); + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10)," + + " val3 varchar(10))"); + conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + conn.close(); + } +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java index b73da96d98..e807e75047 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/iterate/PhoenixQueryTimeoutIT.java @@ -27,6 +27,7 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLTimeoutException; import java.util.Properties; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; @@ -119,11 +120,16 @@ public class PhoenixQueryTimeoutIT extends ParallelStatsDisabledIT { fail("Expected query to timeout with a 1 ms timeout"); } catch (SQLException e) { //OPERATION_TIMED_OUT Exception expected - if (e.getErrorCode() == IO_EXCEPTION.getErrorCode() && e.getCause() instanceof SQLException) { - assertEquals(OPERATION_TIMED_OUT.getErrorCode(), ((SQLException) e.getCause()).getErrorCode()); - } else { - assertEquals(OPERATION_TIMED_OUT.getErrorCode(), e.getErrorCode()); + Throwable t = e; + // SQLTimeoutException can be wrapped inside outer exceptions like PhoenixIOException + while (t != null && !(t instanceof SQLTimeoutException)) { + t = t.getCause(); } + if (t == null) { + fail("Expected query to fail with SQLTimeoutException"); + } + assertEquals(OPERATION_TIMED_OUT.getErrorCode(), + ((SQLTimeoutException)t).getErrorCode()); } finally { BaseResultIterators.setForTestingSetTimeoutToMaxToLetQueryPassHere(false); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java index 9e4e0d625d..03460af96a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PagedRegionScanner.java @@ -24,7 +24,11 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.filter.PagedFilter; +import org.apache.phoenix.util.ScanUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.phoenix.util.ScanUtil.getDummyResult; import static org.apache.phoenix.util.ScanUtil.getPhoenixPagedFilter; @@ -42,6 +46,9 @@ public class PagedRegionScanner extends BaseRegionScanner { protected Region region; protected Scan scan; protected PagedFilter pageFilter; + + private static final Logger LOGGER = LoggerFactory.getLogger(PagedRegionScanner.class); + public PagedRegionScanner(Region region, RegionScanner scanner, Scan scan) { super(scanner); this.region = region; @@ -54,6 +61,9 @@ public class PagedRegionScanner extends BaseRegionScanner { private boolean next(List<Cell> results, boolean raw) throws IOException { try { + if (pageFilter != null) { + pageFilter.init(); + } boolean hasMore = raw ? delegate.nextRaw(results) : delegate.next(results); if (pageFilter == null) { return hasMore; @@ -65,19 +75,20 @@ public class PagedRegionScanner extends BaseRegionScanner { // Close the current region scanner, start a new one and return a dummy result delegate.close(); byte[] rowKey = pageFilter.getRowKeyAtStop(); - scan.withStartRow(rowKey, true); + boolean isInclusive = pageFilter.isNextRowInclusive(); + scan.withStartRow(rowKey, isInclusive); delegate = region.getScanner(scan); if (results.isEmpty()) { - getDummyResult(rowKey, results); + LOGGER.info("Page filter stopped, generating dummy key {} inclusive={}", + Bytes.toStringBinary(rowKey), isInclusive); + ScanUtil.getDummyResult(rowKey, results); } - pageFilter.init(); return true; } return false; } else { // We got a row from the HBase scanner within the configured time (i.e., the page size). We need to // start a new page on the next next() call. - pageFilter.resetStartTime(); return true; } } catch (Exception e) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java index 5fb1244e91..ddcb6fff7a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java @@ -123,6 +123,9 @@ public class UncoveredGlobalIndexRegionScanner extends UncoveredIndexRegionScann protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) throws IOException { Scan dataScan = prepareDataTableScan(dataRowKeys); + if (dataScan == null) { + return; + } try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) { for (Result result = resultScanner.next(); (result != null); result = resultScanner.next()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java index 7cf87dcf00..c6c45eae02 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredIndexRegionScanner.java @@ -158,17 +158,27 @@ public abstract class UncoveredIndexRegionScanner extends BaseRegionScanner { protected Scan prepareDataTableScan(Collection<byte[]> dataRowKeys) throws IOException { List<KeyRange> keys = new ArrayList<>(dataRowKeys.size()); for (byte[] dataRowKey : dataRowKeys) { - keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC)); + // If the data table scan was interrupted because of paging we retry the scan + // but on retry we should only fetch data table rows which we haven't already + // fetched. + if (!dataRows.containsKey(new ImmutableBytesPtr(dataRowKey))) { + keys.add(PVarbinary.INSTANCE.getKeyRange(dataRowKey, SortOrder.ASC)); + } + } + if (!keys.isEmpty()) { + ScanRanges scanRanges = ScanRanges.createPointLookup(keys); + Scan dataScan = new Scan(dataTableScan); + dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); + scanRanges.initializeScan(dataScan); + SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); + dataScan.setFilter(new SkipScanFilter(skipScanFilter, false)); + dataScan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, + Bytes.toBytes(Long.valueOf(pageSizeMs))); + return dataScan; + } else { + LOGGER.info("All data rows have already been fetched"); + return null; } - ScanRanges scanRanges = ScanRanges.createPointLookup(keys); - Scan dataScan = new Scan(dataTableScan); - dataScan.setTimeRange(scan.getTimeRange().getMin(), scan.getTimeRange().getMax()); - scanRanges.initializeScan(dataScan); - SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter(); - dataScan.setFilter(new SkipScanFilter(skipScanFilter, false)); - scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, - Bytes.toBytes(Long.valueOf(pageSizeMs))); - return dataScan; } protected boolean scanIndexTableRows(List<Cell> result, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java index 0d6ddc39d0..d7273aa8a2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UncoveredLocalIndexRegionScanner.java @@ -67,6 +67,9 @@ public class UncoveredLocalIndexRegionScanner extends UncoveredIndexRegionScanne protected void scanDataRows(Collection<byte[]> dataRowKeys, long startTime) throws IOException { Scan dataScan = prepareDataTableScan(dataRowKeys); + if (dataScan == null) { + return; + } try (RegionScanner regionScanner = region.getScanner(dataScan)) { boolean hasMore; do { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java index 8d785c3ab4..370dbf007d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/PagedFilter.java @@ -46,7 +46,11 @@ public class PagedFilter extends FilterBase implements Writable { State state; private long pageSizeMs; private long startTime; - private byte[] rowKeyAtStop; + // tracks the row which we will visit next. It is not always a full row key and maybe + // just the row key prefix. + private Cell nextHintCell; + // tracks the row we last visited + private Cell currentCell; private Filter delegate = null; public PagedFilter() { @@ -68,10 +72,19 @@ public class PagedFilter extends FilterBase implements Writable { } public byte[] getRowKeyAtStop() { - if (rowKeyAtStop != null) { - return Arrays.copyOf(rowKeyAtStop, rowKeyAtStop.length); + byte[] rowKeyAtStop = null; + if (nextHintCell != null) { + // if we have already seeked to the next cell use that when we resume the scan + rowKeyAtStop = CellUtil.cloneRow(nextHintCell); + } else if (currentCell != null) { + rowKeyAtStop = CellUtil.cloneRow(currentCell); } - return null; + return rowKeyAtStop; + } + + public boolean isNextRowInclusive() { + // since this can be a key prefix we have to set inclusive to true when resuming scan + return nextHintCell != null; } public boolean isStopped() { @@ -80,21 +93,19 @@ public class PagedFilter extends FilterBase implements Writable { public void init() { state = State.INITIAL; - rowKeyAtStop = null; - } - - public void resetStartTime() { - if (state == State.STARTED) { - init(); - } + currentCell = null; + nextHintCell = null; } @Override public void reset() throws IOException { - if (state == State.INITIAL) { - startTime = EnvironmentEdgeManager.currentTimeMillis(); - state = State.STARTED; - } else if (state == State.STARTED && EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + long currentTime = EnvironmentEdgeManager.currentTimeMillis(); + // reset can be called multiple times for the same row sometimes even before we have + // scanned even one row. The order in which it is called is not very predictable. + // So we need to ensure that we have seen at least one row before we page. + // The currentCell != null check ensures that. + if (state == State.STARTED && currentCell != null + && currentTime - startTime >= pageSizeMs) { state = State.TIME_TO_STOP; } if (delegate != null) { @@ -107,33 +118,19 @@ public class PagedFilter extends FilterBase implements Writable { @Override public Cell getNextCellHint(Cell currentKV) throws IOException { if (delegate != null) { - return delegate.getNextCellHint(currentKV); + Cell ret = delegate.getNextCellHint(currentKV); + // save the hint so that if the filter stops we know where to resume the scan + nextHintCell = ret; + return ret; } return super.getNextCellHint(currentKV); } - public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException { - if (state == State.TIME_TO_STOP) { - if (rowKeyAtStop == null) { - rowKeyAtStop = new byte[length]; - Bytes.putBytes(rowKeyAtStop, 0, buffer, offset, length); - } - return true; - } - if (delegate != null) { - return delegate.filterRowKey(buffer, offset, length); - } - return super.filterRowKey(buffer, offset, length); - } - @Override public boolean filterRowKey(Cell cell) throws IOException { - if (state == State.TIME_TO_STOP) { - if (rowKeyAtStop == null) { - rowKeyAtStop = CellUtil.cloneRow(cell); - } - return true; - } + currentCell = cell; + // now that we have visited the row we need to reset the hint + nextHintCell = null; if (delegate != null) { return delegate.filterRowKey(cell); } @@ -142,10 +139,13 @@ public class PagedFilter extends FilterBase implements Writable { @Override public boolean filterAllRemaining() throws IOException { - if (state == State.TIME_TO_STOP && rowKeyAtStop != null) { + if (state == State.TIME_TO_STOP) { state = State.STOPPED; return true; } + if (state == State.STOPPED) { + return true; + } if (delegate != null) { return delegate.filterAllRemaining(); } @@ -153,7 +153,14 @@ public class PagedFilter extends FilterBase implements Writable { } @Override + /** + * This is called once for every row in the beginning. + */ public boolean hasFilterRow() { + if (state == State.INITIAL) { + startTime = EnvironmentEdgeManager.currentTimeMillis(); + state = State.STARTED; + } return true; } @@ -211,6 +218,7 @@ public class PagedFilter extends FilterBase implements Writable { @Override public ReturnCode filterKeyValue(Cell v) throws IOException { + if (delegate != null) { return delegate.filterKeyValue(v); } @@ -219,6 +227,8 @@ public class PagedFilter extends FilterBase implements Writable { @Override public Filter.ReturnCode filterCell(Cell c) throws IOException { + currentCell = c; + nextHintCell = null; if (delegate != null) { return delegate.filterCell(c); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java index 3eecfc82c3..e00a9fbeb4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OffsetResultIterator.java @@ -20,13 +20,10 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; import java.util.List; -import org.apache.phoenix.compile.ExplainPlanAttributes - .ExplainPlanAttributesBuilder; -import org.apache.phoenix.schema.tuple.Tuple; -import org.apache.phoenix.util.EnvironmentEdgeManager; +import static org.apache.phoenix.util.ScanUtil.isDummy; -import static org.apache.phoenix.util.ScanUtil.getDummyResult; -import static org.apache.phoenix.util.ScanUtil.getDummyTuple; +import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder; +import org.apache.phoenix.schema.tuple.Tuple; /** * Iterates through tuples up to a limit @@ -49,14 +46,16 @@ public class OffsetResultIterator extends DelegateResultIterator { } @Override public Tuple next() throws SQLException { - long startTime = EnvironmentEdgeManager.currentTimeMillis(); while (rowCount < offset) { Tuple tuple = super.next(); if (tuple == null) { return null; } - rowCount++; - if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { - return getDummyTuple(tuple); + if (isDummy(tuple)) { + // while rowCount < offset absorb the dummy and call next on the underlying scanner + continue; } + rowCount++; + // no page timeout check at this level because we cannot correctly resume + // scans for OFFSET queries until the offset is reached } return super.next(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java index 8189fbcef9..6b9c84e2c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java @@ -40,6 +40,7 @@ import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COU import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_CALLS; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_HBASE_COUNT_SCANNED_REGIONS; +import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_PAGED_ROWS_COUNTER; import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SCAN_BYTES; import static org.apache.phoenix.util.ScanUtil.isDummy; @@ -52,8 +53,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.phoenix.compile.ExplainPlanAttributes - .ExplainPlanAttributesBuilder; +import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder; import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; @@ -81,6 +81,8 @@ public class ScanningResultIterator implements ResultIterator { private final boolean isMapReduceContext; private final long maxQueryEndTime; + private long dummyRowCounter = 0; + public ScanningResultIterator(ResultScanner scanner, Scan scan, ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext, long maxQueryEndTime) { this.scanner = scanner; this.scanMetricsHolder = scanMetricsHolder; @@ -99,13 +101,13 @@ public class ScanningResultIterator implements ResultIterator { } private void changeMetric(CombinableMetric metric, Long value) { - if(value != null) { + if (value != null) { metric.change(value); } } private void changeMetric(GlobalClientMetrics metric, Long value) { - if(value != null) { + if (value != null) { metric.update(value); } } @@ -139,6 +141,7 @@ public class ScanningResultIterator implements ResultIterator { scanMetricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)); changeMetric(scanMetricsHolder.getCountOfRowsFiltered(), scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME)); + changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter); changeMetric(GLOBAL_SCAN_BYTES, scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME)); @@ -165,6 +168,8 @@ public class ScanningResultIterator implements ResultIterator { changeMetric(GLOBAL_HBASE_COUNT_ROWS_FILTERED, scanMetricsMap.get(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME)); + changeMetric(GLOBAL_PAGED_ROWS_COUNTER, dummyRowCounter); + scanMetricsUpdated = true; } @@ -175,6 +180,7 @@ public class ScanningResultIterator implements ResultIterator { try { Result result = scanner.next(); while (result != null && (result.isEmpty() || isDummy(result))) { + dummyRowCounter += 1; long timeOutForScan = maxQueryEndTime - EnvironmentEdgeManager.currentTimeMillis(); if (timeOutForScan < 0) { throw new SQLExceptionInfo.Builder(OPERATION_TIMED_OUT).setMessage( diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index ad19e05aa5..4a2dacd688 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -30,6 +30,7 @@ import static org.apache.phoenix.monitoring.MetricType.NUM_PARALLEL_SCANS; import static org.apache.phoenix.monitoring.MetricType.OPEN_INTERNAL_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.OPEN_PHOENIX_CONNECTIONS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.INDEX_COMMIT_FAILURE_SIZE; +import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_FAILED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_SERVICES_COUNTER; import static org.apache.phoenix.monitoring.MetricType.QUERY_TIME; @@ -109,7 +110,7 @@ public enum GlobalClientMetrics { GLOBAL_HCONNECTIONS_COUNTER(HCONNECTIONS_COUNTER), GLOBAL_PHOENIX_CONNECTIONS_THROTTLED_COUNTER(PHOENIX_CONNECTIONS_THROTTLED_COUNTER), GLOBAL_PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER(PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER), - + GLOBAL_PAGED_ROWS_COUNTER(PAGED_ROWS_COUNTER), GLOBAL_HBASE_COUNT_RPC_CALLS(COUNT_RPC_CALLS), GLOBAL_HBASE_COUNT_REMOTE_RPC_CALLS(COUNT_REMOTE_RPC_CALLS), GLOBAL_HBASE_COUNT_MILLS_BETWEEN_NEXTS(COUNT_MILLS_BETWEEN_NEXTS), @@ -124,7 +125,6 @@ public enum GlobalClientMetrics { GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER); - private static final Logger LOGGER = LoggerFactory.getLogger(GlobalClientMetrics.class); private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled(); private MetricType metricType; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index 75907761e7..fae7c0a5fe 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -67,6 +67,8 @@ public enum MetricType { PHOENIX_CONNECTIONS_ATTEMPTED_COUNTER("ca","Number of requests for Phoenix connections, whether successful or not.",LogLevel.OFF, PLong.INSTANCE), CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE), CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE), + PAGED_ROWS_COUNTER("prc", "Number of dummy rows returned to client due to paging.", LogLevel.DEBUG, PLong.INSTANCE), + // hbase metrics COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE), COUNT_REMOTE_RPC_CALLS("rr", "Number of remote RPC calls",LogLevel.DEBUG, PLong.INSTANCE), @@ -118,7 +120,7 @@ public enum MetricType { public static String getMetricColumnsDetails() { StringBuilder buffer=new StringBuilder(); - for(MetricType metric:MetricType.values()){ + for (MetricType metric:MetricType.values()) { if (metric.logLevel() != LogLevel.OFF) { buffer.append(metric.columnName()); buffer.append(" "); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java index dd6603e152..8b3276a437 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java @@ -28,6 +28,7 @@ import static org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED; import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS; import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES; import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS; +import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER; import java.io.IOException; import java.util.Map; @@ -49,6 +50,8 @@ public class ScanMetricsHolder { private final CombinableMetric countOfRemoteRPCRetries; private final CombinableMetric countOfRowsScanned; private final CombinableMetric countOfRowsFiltered; + private final CombinableMetric countOfRowsPaged; + private Map<String, Long> scanMetricMap; private Object scan; @@ -78,6 +81,7 @@ public class ScanMetricsHolder { countOfRemoteRPCRetries = readMetrics.allotMetric(COUNT_REMOTE_RPC_RETRIES, tableName); countOfRowsScanned = readMetrics.allotMetric(COUNT_ROWS_SCANNED, tableName); countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED, tableName); + countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName); } public CombinableMetric getCountOfRemoteRPCcalls() { @@ -128,6 +132,10 @@ public class ScanMetricsHolder { return scanMetricMap; } + public CombinableMetric getCountOfRowsPaged() { + return countOfRowsPaged; + } + public void setScanMetricMap(Map<String, Long> scanMetricMap) { this.scanMetricMap = scanMetricMap; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 98e3a039ca..6122fec9b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -484,7 +484,7 @@ public class ScanUtil { // But if last field of rowKey is variable length and also DESC, the trailing 0xFF // is not removed when stored in HBASE, so for such case, we should not set // lastInclusiveUpperSingleKey back to false. - if(sepByte != QueryConstants.DESC_SEPARATOR_BYTE) { + if (sepByte != QueryConstants.DESC_SEPARATOR_BYTE) { lastInclusiveUpperSingleKey &= (fieldIndex + slotSpan[i]) < schema.getMaxFields()-1; } } @@ -686,10 +686,10 @@ public class ScanUtil { public static void setupLocalIndexScan(Scan scan) { byte[] prefix = scan.getStartRow().length == 0 ? new byte[scan.getStopRow().length]: scan.getStartRow(); int prefixLength = scan.getStartRow().length == 0? scan.getStopRow().length: scan.getStartRow().length; - if(scan.getAttribute(SCAN_START_ROW_SUFFIX)!=null) { + if(scan.getAttribute(SCAN_START_ROW_SUFFIX) != null) { scan.setStartRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_START_ROW_SUFFIX), 0, prefix, prefixLength)); } - if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX)!=null) { + if(scan.getAttribute(SCAN_STOP_ROW_SUFFIX) != null) { scan.setStopRow(ScanRanges.prefixKey(scan.getAttribute(SCAN_STOP_ROW_SUFFIX), 0, prefix, prefixLength)); } } @@ -711,7 +711,7 @@ public class ScanUtil { * @param newScan */ public static void setLocalIndexAttributes(Scan newScan, int keyOffset, byte[] regionStartKey, byte[] regionEndKey, byte[] startRowSuffix, byte[] stopRowSuffix) { - if(ScanUtil.isLocalIndex(newScan)) { + if (ScanUtil.isLocalIndex(newScan)) { newScan.setAttribute(SCAN_ACTUAL_START_ROW, regionStartKey); newScan.setStartRow(regionStartKey); newScan.setStopRow(regionEndKey); @@ -787,7 +787,7 @@ public class ScanUtil { public static int getRowKeyPosition(int[] slotSpan, int slotPosition) { int offset = 0; - for(int i = 0; i < slotPosition; i++) { + for (int i = 0; i < slotPosition; i++) { offset += slotSpan[i]; } @@ -1298,7 +1298,7 @@ public class ScanUtil { public static boolean isDummy(Tuple tuple) { if (tuple instanceof ResultTuple) { - isDummy(((ResultTuple) tuple).getResult()); + return isDummy(((ResultTuple) tuple).getResult()); } return false; }