This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.16 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.16 by this push: new f49aee6 PHOENIX-6434 Secondary Indexes on PHOENIX_ROW_TIMESTAMP() (#1194) f49aee6 is described below commit f49aee6318831c1c5675fd35f6cb6e96781fab9d Author: kadirozde <37155482+kadiro...@users.noreply.github.com> AuthorDate: Wed Apr 21 18:10:06 2021 -0700 PHOENIX-6434 Secondary Indexes on PHOENIX_ROW_TIMESTAMP() (#1194) --- .../end2end/PhoenixRowTimestampFunctionIT.java | 19 +++ .../end2end/index/GlobalIndexCheckerIT.java | 140 +++++++++++++++++++++ .../apache/phoenix/end2end/index/LocalIndexIT.java | 90 +++++++++++++ .../org/apache/phoenix/compile/DeleteCompiler.java | 7 +- .../coprocessor/GlobalIndexRegionScanner.java | 18 ++- .../execute/PhoenixTxIndexMutationGenerator.java | 3 +- .../function/PhoenixRowTimestampFunction.java | 5 +- .../{ValueGetter.java => AbstractValueGetter.java} | 38 +++--- .../phoenix/hbase/index/IndexRegionObserver.java | 6 +- .../apache/phoenix/hbase/index/ValueGetter.java | 4 + .../hbase/index/covered/data/LazyValueGetter.java | 3 +- .../org/apache/phoenix/index/IndexMaintainer.java | 22 +++- .../phoenix/iterate/TableResultIterator.java | 17 +-- .../apache/phoenix/mapreduce/index/IndexTool.java | 7 +- .../parse/PhoenixRowTimestampParseNode.java | 7 +- .../org/apache/phoenix/schema/MetaDataClient.java | 2 +- .../phoenix/schema/tuple/ValueGetterTuple.java | 15 +-- .../java/org/apache/phoenix/util/IndexUtil.java | 3 +- .../java/org/apache/phoenix/util/ScanUtil.java | 27 +++- .../apache/phoenix/index/IndexMaintainerTest.java | 3 +- 20 files changed, 366 insertions(+), 70 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java index 8b4dfb6..5998690 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java @@ -493,4 +493,23 @@ public class PhoenixRowTimestampFunctionIT extends ParallelStatsDisabledIT { } } } + + @Test + public void testPhoenixRowTimestampWithWildcard() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + conn.createStatement().execute("create table " + dataTableName + + " (pk1 integer not null primary key, x.v1 float, y.v2 float, z.v3 float)" + this.tableDDLOptions); + conn.createStatement().execute("upsert into " + dataTableName + " values(rand() * 100000000, rand(), rand(), rand())"); + conn.commit(); + ResultSet rs = conn.createStatement().executeQuery("SELECT v1 from " + dataTableName); + assertTrue(rs.next()); + float v1 = rs.getFloat(1); + rs = conn.createStatement().executeQuery("SELECT * from " + dataTableName + " order by phoenix_row_timestamp()"); + assertTrue(rs.next()); + System.out.println(v1); + assertTrue(v1 == rs.getFloat(2)); + } + } + } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java index d9cf8a2..47eea6d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java @@ -38,6 +38,8 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Calendar; import java.util.Collection; import java.util.List; import java.util.Map; @@ -168,6 +170,144 @@ public class GlobalIndexCheckerIT extends BaseUniqueNamesOwnClusterIT { } @Test + public void testPhoenixRowTimestamp() throws Exception { + try (Connection conn = DriverManager.getConnection(getUrl())) { + String dataTableName = generateUniqueName(); + String indexTableName = generateUniqueName(); + Timestamp initial = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() - 1); + conn.createStatement().execute("create table " + dataTableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))" + tableDDLOptions); + conn.createStatement().execute("upsert into " + dataTableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + Timestamp before = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + dataTableName + " values ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + Timestamp after = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() + 1); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (val1, PHOENIX_ROW_TIMESTAMP()) " + "include (val2, val3) " + (async ? "ASYNC" : "")+ this.indexDDLOptions); + if (async) { + // Run the index MR job to rebuild the index and verify that index is built correctly + IndexToolIT.runIndexTool(true, false, null, dataTableName, + indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER); + } + + String timeZoneID = Calendar.getInstance().getTimeZone().getID(); + // Write a query to get the val2 = 'bc' with a time range query + String query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "') AND " + + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + // Verify that we will read from the index table + assertExplainPlan(conn, query, dataTableName, indexTableName); + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(before)); + assertTrue(rs.getTimestamp(3).before(after)); + assertFalse(rs.next()); + // Count the number of index rows + rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + // Add one more row with val2 ='bc' and check this does not change the result of the previous + // query + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + dataTableName + " values ('c', 'bc', 'ccc', 'cccc')"); + conn.commit(); + assertExplainPlan(conn, query, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(before)); + assertTrue(rs.getTimestamp(3).before(after)); + assertFalse(rs.next()); + // Write a time range query to get the last row with val2 ='bc' + query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + // Verify that we will read from the index table + assertExplainPlan(conn, query, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("ccc", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(after)); + assertFalse(rs.next()); + // Verify that we can execute the same query without using the index + String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + // Verify that we will read from the data table + rs = conn.createStatement().executeQuery("EXPLAIN " + noIndexQuery); + String explainPlan = QueryUtil.getExplainPlan(rs); + assertTrue(explainPlan.contains("FULL SCAN OVER " + dataTableName)); + rs = conn.createStatement().executeQuery(noIndexQuery); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("ccc", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(after)); + after = rs.getTimestamp(3); + assertFalse(rs.next()); + // Add an unverified index row + IndexRegionObserver.setFailPostIndexUpdatesForTesting(true); + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + dataTableName + " values ('d', 'de', 'def', 'defg')"); + conn.commit(); + IndexRegionObserver.setFailPostIndexUpdatesForTesting(false); + // Make sure that we can repair the unverified row + query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'de'"; + // Verify that we will read from the index table + assertExplainPlan(conn, query, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("de", rs.getString(1)); + assertEquals("def", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(after)); + assertFalse(rs.next()); + // Add a new index where the index row key starts with PHOENIX_ROW_TIMESTAMP() + indexTableName = generateUniqueName(); + conn.createStatement().execute("CREATE INDEX " + indexTableName + " on " + + dataTableName + " (PHOENIX_ROW_TIMESTAMP()) " + "include (val1, val2, val3) " + + (async ? "ASYNC" : "")+ this.indexDDLOptions); + if (async) { + // Run the index MR job to rebuild the index and verify that index is built correctly + IndexToolIT.runIndexTool(true, false, null, dataTableName, + indexTableName, null, 0, IndexTool.IndexVerifyType.AFTER); + } + // Add one more row + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + dataTableName + " values ('e', 'ae', 'efg', 'efgh')"); + conn.commit(); + // Write a query to get all the rows in the order of their timestamps + query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + // Verify that we will read from the index table + assertExplainPlan(conn, query, dataTableName, indexTableName); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("ab", rs.getString(1)); + assertEquals("abc", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("ccc", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("de", rs.getString(1)); + assertEquals("def", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("ae", rs.getString(1)); + assertEquals("efg", rs.getString(2)); + assertFalse(rs.next()); + } + } + + @Test public void testDeleteNonExistingRow() throws Exception { if (async) { // No need to run the same test twice one for async = true and the other for async = false diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java index 9f79107..205cfe1 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java @@ -33,6 +33,8 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; +import java.util.Calendar; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -70,6 +72,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -83,6 +86,93 @@ public class LocalIndexIT extends BaseLocalIndexIT { } @Test + public void testPhoenixRowTimestamp() throws Exception { + String tableName = generateUniqueName(); + String indexName = "IDX_" + generateUniqueName(); + try (Connection conn = DriverManager.getConnection(getUrl())) { + Timestamp initial = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() - 1); + conn.createStatement().execute("create table " + tableName + + " (id varchar(10) not null primary key, val1 varchar(10), val2 varchar(10), val3 varchar(10))"); + conn.createStatement().execute("upsert into " + tableName + " values ('a', 'ab', 'abc', 'abcd')"); + conn.commit(); + Timestamp before = new Timestamp(EnvironmentEdgeManager.currentTimeMillis()); + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + tableName + " values ('b', 'bc', 'bcd', 'bcde')"); + conn.commit(); + Timestamp after = new Timestamp(EnvironmentEdgeManager.currentTimeMillis() + 1); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " on " + + tableName + " (val1, PHOENIX_ROW_TIMESTAMP()) "); + + String timeZoneID = Calendar.getInstance().getTimeZone().getID(); + // Write a query to get the row with val2 = 'bc' with a time range query + String query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + tableName + " WHERE val1 = 'bc' AND " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "') AND " + + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + ResultSet rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(before)); + assertTrue(rs.getTimestamp(3).before(after)); + assertFalse(rs.next()); + // Count the number of index rows + rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexName); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + // Add one more row with val2 ='bc' and check this does not change the result of the previous + // query + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + tableName + " values ('c', 'bc', 'ccc', 'cccc')"); + conn.commit(); + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(before)); + assertTrue(rs.getTimestamp(3).before(after)); + assertFalse(rs.next()); + // Write a time range query to get the last row with val2 ='bc' + query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + tableName + " WHERE val1 = 'bc' AND " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("ccc", rs.getString(2)); + assertTrue(rs.getTimestamp(3).after(after)); + assertFalse(rs.next()); + // Add a new index where the index row key starts with PHOENIX_ROW_TIMESTAMP() + indexName = generateUniqueName(); + conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + " on " + + tableName + " (PHOENIX_ROW_TIMESTAMP()) " + "include (val1, val2, val3) "); + // Add one more row + // Sleep 1ms to get a different row timestamps + Thread.sleep(1); + conn.createStatement().execute("upsert into " + tableName + " values ('d', 'ad', 'def', 'defg')"); + conn.commit(); + // Write a query to get all the rows in the order of their timestamps + query = "SELECT val1, val2, PHOENIX_ROW_TIMESTAMP() from " + tableName + " WHERE " + + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')"; + rs = conn.createStatement().executeQuery(query); + assertTrue(rs.next()); + assertEquals("ab", rs.getString(1)); + assertEquals("abc", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("bcd", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("bc", rs.getString(1)); + assertEquals("ccc", rs.getString(2)); + assertTrue(rs.next()); + assertEquals("ad", rs.getString(1)); + assertEquals("def", rs.getString(2)); + assertFalse(rs.next()); + } + } + + + @Test public void testSelectFromIndexWithAdditionalWhereClause() throws Exception { if (isNamespaceMapped) { return; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index f33e5c2..bbbe7e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -49,6 +49,7 @@ import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -170,7 +171,7 @@ public class DeleteCompiler { try (final PhoenixResultSet rs = new PhoenixResultSet(iterator, projector, context)) { ValueGetter getter = null; if (!otherTableRefs.isEmpty()) { - getter = new ValueGetter() { + getter = new AbstractValueGetter() { final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(); @@ -247,10 +248,10 @@ public class DeleteCompiler { if (table.getType() == PTableType.INDEX) { otherRowKeyPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, viewConstants)); if (otherTable.getType() == PTableType.INDEX) { - otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, otherRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); + otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, otherRowKeyPtr, null, null, rs.getCurrentRow().getValue(0).getTimestamp())); } } else { - otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); + otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, rs.getCurrentRow().getValue(0).getTimestamp())); } otherMutations.get(i).put(otherRowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index 540851a..9c3e203 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -263,7 +263,19 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { valuePtr.set(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); return valuePtr; } - + @Override + public KeyValue getLatestKeyValue(ColumnReference ref, long ts) { + List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier()); + if (cellList == null || cellList.isEmpty()) { + return null; + } + Cell cell = cellList.get(0); + return new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(), + cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()), + cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); + } @Override public byte[] getRowKey() { return put.getRow(); @@ -274,7 +286,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { public static byte[] getIndexRowKey(IndexMaintainer indexMaintainer, final Put dataRow) { ValueGetter valueGetter = new SimpleValueGetter(dataRow); return indexMaintainer.buildRowKey(valueGetter, new ImmutableBytesWritable(dataRow.getRow()), - null, null, HConstants.LATEST_TIMESTAMP); + null, null, getMaxTimestamp(dataRow)); } public static long getMaxTimestamp(Mutation m) { @@ -1188,7 +1200,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { if (indexPut == null) { // No covered column. Just prepare an index row with the empty column byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, rowKeyPtr, - null, null, HConstants.LATEST_TIMESTAMP); + null, null, ts); indexPut = new Put(indexRowKey); } else { removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java index 8a94314..204ec3c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java @@ -49,6 +49,7 @@ import org.apache.phoenix.cache.IndexMetaDataCache; import org.apache.phoenix.compile.ScanRanges; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.filter.SkipScanFilter; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.MultiMutation; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.IndexMetaData; @@ -462,7 +463,7 @@ public class PhoenixTxIndexMutationGenerator { throws IOException { // TODO: creating these objects over and over again is wasteful ColumnTracker tracker = new ColumnTracker(indexedColumns); - ValueGetter getter = new ValueGetter() { + ValueGetter getter = new AbstractValueGetter() { @Override public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java index 050a046..5d99b03 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java @@ -20,6 +20,7 @@ package org.apache.phoenix.expression.function; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -28,7 +29,9 @@ import org.apache.phoenix.parse.PhoenixRowTimestampParseNode; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PDate; +import org.apache.phoenix.schema.types.PTimestamp; +import java.sql.Timestamp; import java.util.Date; import java.util.List; @@ -89,7 +92,7 @@ public class PhoenixRowTimestampFunction extends ScalarFunction { @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { - if (tuple == null || tuple.size() == 0) { + if (tuple == null) { return false; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/AbstractValueGetter.java similarity index 52% copy from phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java copy to phoenix-core/src/main/java/org/apache/phoenix/hbase/index/AbstractValueGetter.java index 8c75424..08d19d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/AbstractValueGetter.java @@ -19,23 +19,27 @@ package org.apache.phoenix.hbase.index; import java.io.IOException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; -public interface ValueGetter { - public static final ImmutableBytesWritable HIDDEN_BY_DELETE = new ImmutableBytesWritable(new byte[0]); - /** - * Get the most recent (largest timestamp) for the given column reference - * @param ref to match against an underlying key value. Uses the passed object to match the - * keyValue via {@link ColumnReference#matches} - * @param ts time stamp at which mutations will be issued - * @return the stored value for the given {@link ColumnReference}, <tt>null</tt> if no value is - * present, or {@link ValueGetter#HIDDEN_BY_DELETE} if no value is present and the ref - * will be shadowed by a delete marker. - * @throws IOException if there is an error accessing the underlying data storage - */ - public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException; - - public byte[] getRowKey(); - -} \ No newline at end of file +public abstract class AbstractValueGetter implements ValueGetter{ + @Override + public KeyValue getLatestKeyValue(ColumnReference ref, long ts) throws IOException { + ImmutableBytesWritable value = getLatestValue(ref, ts); + byte[] rowKey = getRowKey(); + int valueOffset = 0; + int valueLength = 0; + byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY; + if (value != null) { + valueBytes = value.get(); + valueOffset = value.getOffset(); + valueLength = value.getLength(); + } + return new KeyValue(rowKey, 0, rowKey.length, ref.getFamily(), 0, ref.getFamily().length, + ref.getQualifier(), 0, ref.getQualifier().length, ts, KeyValue.Type.Put, + valueBytes, valueOffset, valueLength); + } + } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index cb7cf0d..4b341cf 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -769,7 +769,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver { if (indexPut == null) { // No covered column. Just prepare an index row with the empty column byte[] indexRowKey = indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr, - null, null, HConstants.LATEST_TIMESTAMP); + null, null, ts); indexPut = new Put(indexRowKey); } else { removeEmptyColumn(indexPut, indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), @@ -783,7 +783,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver { if (currentDataRowState != null) { ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState); byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr, - null, null, HConstants.LATEST_TIMESTAMP); + null, null, ts); if (Bytes.compareTo(indexPut.getRow(), indexRowKeyForCurrentDataRow) != 0) { Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, IndexMaintainer.DeleteType.ALL_VERSIONS, ts); @@ -794,7 +794,7 @@ public class IndexRegionObserver extends CompatIndexRegionObserver { } else if (currentDataRowState != null) { ValueGetter currentDataRowVG = new GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState); byte[] indexRowKeyForCurrentDataRow = indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr, - null, null, HConstants.LATEST_TIMESTAMP); + null, null, ts); Mutation del = indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow, IndexMaintainer.DeleteType.ALL_VERSIONS, ts); context.indexUpdates.put(hTableInterfaceReference, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java index 8c75424..7b8763e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java @@ -19,6 +19,9 @@ package org.apache.phoenix.hbase.index; import java.io.IOException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -35,6 +38,7 @@ public interface ValueGetter { * @throws IOException if there is an error accessing the underlying data storage */ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) throws IOException; + public KeyValue getLatestKeyValue(ColumnReference ref, long ts) throws IOException; public byte[] getRowKey(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java index 1049c89..fc2cb53 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; @@ -36,7 +37,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; * {@link ValueGetter} that uses lazy initialization to get the value for the given * {@link ColumnReference}. Once stored, the mapping for that reference is retained. */ -public class LazyValueGetter implements ValueGetter { +public class LazyValueGetter extends AbstractValueGetter { private CoveredDeleteScanner scan; private volatile Map<ColumnReference, ImmutableBytesWritable> values; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index c0bd10f..e1ca183 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -68,6 +68,7 @@ import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.SingleCellConstructorExpression; import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -585,8 +586,18 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { cf == null ? dataTable.getColumnForColumnQualifier(null, cq) : dataTable.getColumnFamily(cf) .getPColumnForColumnQualifier(cq); - indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() - .getString(), dataColumn.getName().getString())); + if (dataColumn == null) { + if (Bytes.compareTo(cf, dataEmptyKeyValueCF) == 0 + && Bytes.compareTo(cq, EncodedColumnsUtil.getEmptyKeyValueInfo(dataEncodingScheme).getFirst()) == 0) { + return null; + } else { + throw new ColumnNotFoundException(dataTable.getSchemaName().getString(), + dataTable.getTableName().getString(), Bytes.toString(cf), Bytes.toString(cq)); + } + } else { + indexedColumnsInfo.add(new Pair<>(dataColumn.getFamilyName() + .getString(), dataColumn.getName().getString())); + } } catch (ColumnNotFoundException | ColumnFamilyNotFoundException | AmbiguousColumnException e) { throw new RuntimeException(e); @@ -1032,7 +1043,10 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { Put put = null; // New row being inserted: add the empty key value ImmutableBytesWritable latestValue = null; - if (valueGetter==null || (latestValue = valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null || latestValue == ValueGetter.HIDDEN_BY_DELETE) { + if (valueGetter==null || + this.getCoveredColumns().isEmpty() || + (latestValue = valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null || + latestValue == ValueGetter.HIDDEN_BY_DELETE) { // We need to track whether or not our empty key value is hidden by a Delete Family marker at the same timestamp. // If it is, these Puts will be masked so should not be emitted. if (latestValue == ValueGetter.HIDDEN_BY_DELETE) { @@ -1953,7 +1967,7 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()); valueMap.put(new ColumnReference(kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength()), value); } - return new ValueGetter() { + return new AbstractValueGetter() { @Override public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { if(ref.equals(indexEmptyKeyValueRef)) return null; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index e2d9bfa..bf2cf0d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -57,6 +57,7 @@ import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; import org.apache.phoenix.util.Closeables; @@ -138,21 +139,7 @@ public class TableResultIterator implements ResultIterator { this.caches = caches; this.retry=plan.getContext().getConnection().getQueryServices().getProps() .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); - ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection()); - ScanUtil.setScanAttributesForPhoenixTTL(scan, table, plan.getContext().getConnection()); - if (plan.getContext().getConnection().getQueryServices().getProps().getBoolean( - QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, - QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) { - long pageSizeMs = plan.getContext().getConnection().getQueryServices().getProps() - .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1); - if (pageSizeMs == -1) { - // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase - // region server will be able to send a heartbeat message to the client before the client times out - pageSizeMs = (long) (plan.getContext().getConnection().getQueryServices().getProps() - .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5); - } - scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); - } + ScanUtil.setScanAttributesForClient(scan, table, plan.getContext().getConnection()); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java index 97f88aa..4f6a9f3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java @@ -39,6 +39,7 @@ import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine; import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser; import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser; @@ -1066,7 +1067,8 @@ public class IndexTool extends Configured implements Tool { while (rs.next()) { rs.getCurrentRow().getKey(dataRowKeyPtr); // regionStart/EndKey only needed for local indexes, so we pass null - byte[] indexRowKey = maintainer.buildRowKey(getter, dataRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP); + byte[] indexRowKey = maintainer.buildRowKey(getter, dataRowKeyPtr, null, null, + rs.getCurrentRow().getValue(0).getTimestamp()); histo.addValue(indexRowKey); } List<Bucket> buckets = histo.computeBuckets(); @@ -1094,7 +1096,7 @@ public class IndexTool extends Configured implements Tool { for (String dataCol : dataColNames) { rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++); } - ValueGetter getter = new ValueGetter() { + return new AbstractValueGetter() { final ImmutableBytesWritable valuePtr = new ImmutableBytesWritable(); final ImmutableBytesWritable rowKeyPtr = new ImmutableBytesWritable(); @@ -1118,7 +1120,6 @@ public class IndexTool extends Configured implements Tool { return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr); } }; - return getter; } /** diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java index c9f0158..34457b3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java @@ -19,6 +19,7 @@ package org.apache.phoenix.parse; import org.apache.phoenix.compile.StatementContext; +import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.function.FunctionExpression; @@ -90,10 +91,8 @@ public class PhoenixRowTimestampParseNode extends FunctionParseNode { } }, emptyColumnFamilyName, emptyColumnName); List<Expression> expressionList = Arrays.asList(new Expression[] {emptyColumnExpression}); - - // Add the empty column to the projection list. - // According to PHOENIX-4179 this will then return the timestamp of the empty column cell. - context.getScan().addColumn(emptyColumnFamilyName, emptyColumnName); + context.getScan().setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, emptyColumnFamilyName); + context.getScan().setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, emptyColumnName); return new PhoenixRowTimestampFunction(expressionList); } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 448a8fc..f78deb3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -1607,7 +1607,7 @@ public class MetaDataClient { if (expressionIndexCompiler.isAggregate()) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException(); } - if (expression.getDeterminism() != Determinism.ALWAYS) { + if (!(expression.getDeterminism() == Determinism.ALWAYS || expression.getDeterminism() == Determinism.PER_ROW)) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException(); } if (expression.isStateless()) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java index 728b1e0..833e9f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java @@ -57,22 +57,17 @@ public class ValueGetterTuple extends BaseTuple { @Override public KeyValue getValue(byte[] family, byte[] qualifier) { - ImmutableBytesWritable value = null; try { - value = valueGetter.getLatestValue(new ColumnReference(family, qualifier), ts); + KeyValue kv = valueGetter.getLatestKeyValue(new ColumnReference(family, qualifier), ts); + if (kv != null) { + return kv; + } } catch (IOException e) { throw new RuntimeException(e); } byte[] rowKey = valueGetter.getRowKey(); - int valueOffset = 0; - int valueLength = 0; byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY; - if (value != null) { - valueBytes = value.get(); - valueOffset = value.getOffset(); - valueLength = value.getLength(); - } - return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, valueBytes, valueOffset, valueLength); + return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, qualifier, 0, qualifier.length, ts, Type.Put, valueBytes, 0, 0); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index ec1a4f2..6fb4add 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -86,6 +86,7 @@ import org.apache.phoenix.expression.KeyValueColumnExpression; import org.apache.phoenix.expression.RowKeyColumnExpression; import org.apache.phoenix.expression.SingleCellColumnExpression; import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -340,7 +341,7 @@ public class IndexUtil { * updating an existing row. */ if (dataMutation instanceof Put) { - ValueGetter valueGetter = new ValueGetter() { + ValueGetter valueGetter = new AbstractValueGetter() { @Override public byte[] getRowKey() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index 98caf91..cdc7594 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -1170,7 +1170,6 @@ public class ScanUtil { if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == null) { BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable); } - addEmptyColumnToScan(scan, emptyCF, emptyCQ); } public static void setScanAttributesForPhoenixTTL(Scan scan, PTable table, @@ -1233,10 +1232,34 @@ public class ScanUtil { HConstants.EMPTY_BYTE_ARRAY, scan.getStartRow(), scan.getStopRow()); } - addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName); } } + public static void setScanAttributesForClient(Scan scan, PTable table, + PhoenixConnection phoenixConnection) throws SQLException { + setScanAttributesForIndexReadRepair(scan, table, phoenixConnection); + setScanAttributesForPhoenixTTL(scan, table, phoenixConnection); + byte[] emptyCF = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME); + byte[] emptyCQ = scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME); + if (emptyCF != null && emptyCQ != null) { + addEmptyColumnToScan(scan, emptyCF, emptyCQ); + } + if (phoenixConnection.getQueryServices().getProps().getBoolean( + QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) { + long pageSizeMs = phoenixConnection.getQueryServices().getProps() + .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1); + if (pageSizeMs == -1) { + // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase + // region server will be able to send a heartbeat message to the client before the client times out + pageSizeMs = (long) (phoenixConnection.getQueryServices().getProps() + .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5); + } + scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); + } + + } + public static void getDummyResult(byte[] rowKey, List<Cell> result) { Cell keyValue = KeyValueUtil.newKeyValue(rowKey, 0, diff --git a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java index 0260d66..e0f2ac3 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.end2end.index.IndexTestUtil; +import org.apache.phoenix.hbase.index.AbstractValueGetter; import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; @@ -75,7 +76,7 @@ public class IndexMaintainerTest extends BaseConnectionlessQueryTest { } private static ValueGetter newValueGetter(final byte[] row, final Map<ColumnReference, byte[]> valueMap) { - return new ValueGetter() { + return new AbstractValueGetter() { @Override public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) {