This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.16 by this push:
     new f49aee6  PHOENIX-6434 Secondary Indexes on PHOENIX_ROW_TIMESTAMP() 
(#1194)
f49aee6 is described below

commit f49aee6318831c1c5675fd35f6cb6e96781fab9d
Author: kadirozde <37155482+kadiro...@users.noreply.github.com>
AuthorDate: Wed Apr 21 18:10:06 2021 -0700

    PHOENIX-6434 Secondary Indexes on PHOENIX_ROW_TIMESTAMP() (#1194)
---
 .../end2end/PhoenixRowTimestampFunctionIT.java     |  19 +++
 .../end2end/index/GlobalIndexCheckerIT.java        | 140 +++++++++++++++++++++
 .../apache/phoenix/end2end/index/LocalIndexIT.java |  90 +++++++++++++
 .../org/apache/phoenix/compile/DeleteCompiler.java |   7 +-
 .../coprocessor/GlobalIndexRegionScanner.java      |  18 ++-
 .../execute/PhoenixTxIndexMutationGenerator.java   |   3 +-
 .../function/PhoenixRowTimestampFunction.java      |   5 +-
 .../{ValueGetter.java => AbstractValueGetter.java} |  38 +++---
 .../phoenix/hbase/index/IndexRegionObserver.java   |   6 +-
 .../apache/phoenix/hbase/index/ValueGetter.java    |   4 +
 .../hbase/index/covered/data/LazyValueGetter.java  |   3 +-
 .../org/apache/phoenix/index/IndexMaintainer.java  |  22 +++-
 .../phoenix/iterate/TableResultIterator.java       |  17 +--
 .../apache/phoenix/mapreduce/index/IndexTool.java  |   7 +-
 .../parse/PhoenixRowTimestampParseNode.java        |   7 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |   2 +-
 .../phoenix/schema/tuple/ValueGetterTuple.java     |  15 +--
 .../java/org/apache/phoenix/util/IndexUtil.java    |   3 +-
 .../java/org/apache/phoenix/util/ScanUtil.java     |  27 +++-
 .../apache/phoenix/index/IndexMaintainerTest.java  |   3 +-
 20 files changed, 366 insertions(+), 70 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
index 8b4dfb6..5998690 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PhoenixRowTimestampFunctionIT.java
@@ -493,4 +493,23 @@ public class PhoenixRowTimestampFunctionIT extends 
ParallelStatsDisabledIT {
             }
         }
     }
+
+    @Test
+    public void testPhoenixRowTimestampWithWildcard() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (pk1 integer not null primary key, x.v1 float, y.v2 
float, z.v3 float)" + this.tableDDLOptions);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
values(rand() * 100000000, rand(), rand(), rand())");
+            conn.commit();
+            ResultSet rs = conn.createStatement().executeQuery("SELECT v1 from 
" + dataTableName);
+            assertTrue(rs.next());
+            float v1 = rs.getFloat(1);
+            rs = conn.createStatement().executeQuery("SELECT * from " + 
dataTableName + " order by phoenix_row_timestamp()");
+            assertTrue(rs.next());
+            System.out.println(v1);
+            assertTrue(v1 == rs.getFloat(2));
+        }
+    }
+
 }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
index d9cf8a2..47eea6d 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/GlobalIndexCheckerIT.java
@@ -38,6 +38,8 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -168,6 +170,144 @@ public class GlobalIndexCheckerIT extends 
BaseUniqueNamesOwnClusterIT {
     }
 
     @Test
+    public void testPhoenixRowTimestamp() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            String dataTableName = generateUniqueName();
+            String indexTableName = generateUniqueName();
+            Timestamp initial = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis() - 1);
+            conn.createStatement().execute("create table " + dataTableName +
+                    " (id varchar(10) not null primary key, val1 varchar(10), 
val2 varchar(10), val3 varchar(10))" + tableDDLOptions);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
values ('a', 'ab', 'abc', 'abcd')");
+            conn.commit();
+            Timestamp before = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
values ('b', 'bc', 'bcd', 'bcde')");
+            conn.commit();
+            Timestamp after = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis() + 1);
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                    dataTableName + " (val1, PHOENIX_ROW_TIMESTAMP()) " + 
"include (val2, val3) " + (async ? "ASYNC" : "")+ this.indexDDLOptions);
+            if (async) {
+                // Run the index MR job to rebuild the index and verify that 
index is built correctly
+                IndexToolIT.runIndexTool(true, false, null, dataTableName,
+                        indexTableName, null, 0, 
IndexTool.IndexVerifyType.AFTER);
+            }
+
+            String timeZoneID = Calendar.getInstance().getTimeZone().getID();
+            // Write a query to get the val2 = 'bc' with a time range query
+            String query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " 
+ dataTableName + " WHERE val1 = 'bc' AND " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + before.toString() 
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "') AND " +
+                    "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after.toString() + 
"','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(before));
+            assertTrue(rs.getTimestamp(3).before(after));
+            assertFalse(rs.next());
+            // Count the number of index rows
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + 
indexTableName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            // Add one more row with val2 ='bc' and check this does not change 
the result of the previous
+            // query
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
values ('c', 'bc', 'ccc', 'cccc')");
+            conn.commit();
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(before));
+            assertTrue(rs.getTimestamp(3).before(after));
+            assertFalse(rs.next());
+            // Write a time range query to get the last row with val2 ='bc'
+            query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " + 
dataTableName + " WHERE val1 = 'bc' AND " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after.toString() + 
"','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            assertFalse(rs.next());
+            // Verify that we can execute the same query without using the 
index
+            String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, 
PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after.toString() + 
"','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the data table
+            rs = conn.createStatement().executeQuery("EXPLAIN " + 
noIndexQuery);
+            String explainPlan = QueryUtil.getExplainPlan(rs);
+            assertTrue(explainPlan.contains("FULL SCAN OVER " + 
dataTableName));
+            rs = conn.createStatement().executeQuery(noIndexQuery);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            after = rs.getTimestamp(3);
+            assertFalse(rs.next());
+            // Add an unverified index row
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
values ('d', 'de', 'def', 'defg')");
+            conn.commit();
+            IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+            // Make sure that we can repair the unverified row
+            query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP()  from " + 
dataTableName + " WHERE val1 = 'de'";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("de", rs.getString(1));
+            assertEquals("def", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            assertFalse(rs.next());
+            // Add a new index where the index row key starts with 
PHOENIX_ROW_TIMESTAMP()
+            indexTableName = generateUniqueName();
+            conn.createStatement().execute("CREATE INDEX " + indexTableName + 
" on " +
+                    dataTableName + " (PHOENIX_ROW_TIMESTAMP()) " + "include 
(val1, val2, val3) " +
+                    (async ? "ASYNC" : "")+ this.indexDDLOptions);
+            if (async) {
+                // Run the index MR job to rebuild the index and verify that 
index is built correctly
+                IndexToolIT.runIndexTool(true, false, null, dataTableName,
+                        indexTableName, null, 0, 
IndexTool.IndexVerifyType.AFTER);
+            }
+            // Add one more row
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + dataTableName + " 
values ('e', 'ae', 'efg', 'efgh')");
+            conn.commit();
+            // Write a query to get all the rows in the order of their 
timestamps
+            query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " + 
dataTableName + " WHERE " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString() 
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            // Verify that we will read from the index table
+            assertExplainPlan(conn, query, dataTableName, indexTableName);
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("abc", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("de", rs.getString(1));
+            assertEquals("def", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("ae", rs.getString(1));
+            assertEquals("efg", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+    @Test
     public void testDeleteNonExistingRow() throws Exception {
         if (async) {
             // No need to run the same test twice one for async = true and the 
other for async = false
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
index 9f79107..205cfe1 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/LocalIndexIT.java
@@ -33,6 +33,8 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Calendar;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
@@ -70,6 +72,7 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
 import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.TableNotFoundException;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
@@ -83,6 +86,93 @@ public class LocalIndexIT extends BaseLocalIndexIT {
     }
 
     @Test
+    public void testPhoenixRowTimestamp() throws Exception {
+        String tableName = generateUniqueName();
+        String indexName = "IDX_" + generateUniqueName();
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            Timestamp initial = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis() - 1);
+            conn.createStatement().execute("create table " + tableName +
+                    " (id varchar(10) not null primary key, val1 varchar(10), 
val2 varchar(10), val3 varchar(10))");
+            conn.createStatement().execute("upsert into " + tableName + " 
values ('a', 'ab', 'abc', 'abcd')");
+            conn.commit();
+            Timestamp before = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis());
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + tableName + " 
values ('b', 'bc', 'bcd', 'bcde')");
+            conn.commit();
+            Timestamp after = new 
Timestamp(EnvironmentEdgeManager.currentTimeMillis() + 1);
+            conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + 
" on " +
+                    tableName + " (val1, PHOENIX_ROW_TIMESTAMP()) ");
+
+            String timeZoneID = Calendar.getInstance().getTimeZone().getID();
+            // Write a query to get the row with val2 = 'bc' with a time range 
query
+            String query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " 
+ tableName + " WHERE val1 = 'bc' AND " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + before.toString() 
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "') AND " +
+                    "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after.toString() + 
"','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            ResultSet rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(before));
+            assertTrue(rs.getTimestamp(3).before(after));
+            assertFalse(rs.next());
+            // Count the number of index rows
+            rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + 
indexName);
+            assertTrue(rs.next());
+            assertEquals(2, rs.getInt(1));
+            // Add one more row with val2 ='bc' and check this does not change 
the result of the previous
+            // query
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + tableName + " 
values ('c', 'bc', 'ccc', 'cccc')");
+            conn.commit();
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(before));
+            assertTrue(rs.getTimestamp(3).before(after));
+            assertFalse(rs.next());
+            // Write a time range query to get the last row with val2 ='bc'
+            query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " + 
tableName + " WHERE val1 = 'bc' AND " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after.toString() + 
"','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.getTimestamp(3).after(after));
+            assertFalse(rs.next());
+            // Add a new index where the index row key starts with 
PHOENIX_ROW_TIMESTAMP()
+            indexName = generateUniqueName();
+            conn.createStatement().execute("CREATE LOCAL INDEX " + indexName + 
" on " +
+                    tableName + " (PHOENIX_ROW_TIMESTAMP()) " + "include 
(val1, val2, val3) ");
+            // Add one more row
+            // Sleep 1ms to get a different row timestamps
+            Thread.sleep(1);
+            conn.createStatement().execute("upsert into " + tableName + " 
values ('d', 'ad', 'def', 'defg')");
+            conn.commit();
+            // Write a query to get all the rows in the order of their 
timestamps
+            query = "SELECT  val1, val2, PHOENIX_ROW_TIMESTAMP() from " + 
tableName + " WHERE " +
+                    "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + initial.toString() 
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
+            assertEquals("ab", rs.getString(1));
+            assertEquals("abc", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("bcd", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("bc", rs.getString(1));
+            assertEquals("ccc", rs.getString(2));
+            assertTrue(rs.next());
+            assertEquals("ad", rs.getString(1));
+            assertEquals("def", rs.getString(2));
+            assertFalse(rs.next());
+        }
+    }
+
+
+    @Test
     public void testSelectFromIndexWithAdditionalWhereClause() throws 
Exception {
         if (isNamespaceMapped) {
             return;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index f33e5c2..bbbe7e9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
 import org.apache.phoenix.execute.MutationState.RowMutationState;
 import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -170,7 +171,7 @@ public class DeleteCompiler {
         try (final PhoenixResultSet rs = new PhoenixResultSet(iterator, 
projector, context)) {
             ValueGetter getter = null;
             if (!otherTableRefs.isEmpty()) {
-                getter = new ValueGetter() {
+                getter = new AbstractValueGetter() {
                     final ImmutableBytesWritable valuePtr = new 
ImmutableBytesWritable();
                     final ImmutableBytesWritable rowKeyPtr = new 
ImmutableBytesWritable();
     
@@ -247,10 +248,10 @@ public class DeleteCompiler {
                     if (table.getType() == PTableType.INDEX) {
                         
otherRowKeyPtr.set(scannedIndexMaintainer.buildDataRowKey(rowKeyPtr, 
viewConstants));
                         if (otherTable.getType() == PTableType.INDEX) {
-                            
otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, otherRowKeyPtr, null, 
null, HConstants.LATEST_TIMESTAMP));
+                            
otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, otherRowKeyPtr, null, 
null, rs.getCurrentRow().getValue(0).getTimestamp()));
                         }
                     } else {
-                        otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, 
rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP));
+                        otherRowKeyPtr.set(maintainers[i].buildRowKey(getter, 
rowKeyPtr, null, null, rs.getCurrentRow().getValue(0).getTimestamp()));
                     }
                     otherMutations.get(i).put(otherRowKeyPtr, new 
RowMutationState(PRow.DELETE_MARKER, 0, 
statement.getConnection().getStatementExecutionCounter(), 
NULL_ROWTIMESTAMP_INFO, null));
                 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 540851a..9c3e203 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -263,7 +263,19 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
             valuePtr.set(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
             return valuePtr;
         }
-
+        @Override
+        public KeyValue getLatestKeyValue(ColumnReference ref, long ts) {
+            List<Cell> cellList = put.get(ref.getFamily(), ref.getQualifier());
+            if (cellList == null || cellList.isEmpty()) {
+                return null;
+            }
+            Cell cell = cellList.get(0);
+            return new KeyValue(cell.getRowArray(), cell.getRowOffset(), 
cell.getRowLength(),
+                    cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),
+                    cell.getQualifierArray(), cell.getQualifierOffset(), 
cell.getQualifierLength(),
+                    cell.getTimestamp(), 
KeyValue.Type.codeToType(cell.getTypeByte()),
+                    cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+        }
         @Override
         public byte[] getRowKey() {
             return put.getRow();
@@ -274,7 +286,7 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
     public static byte[] getIndexRowKey(IndexMaintainer indexMaintainer, final 
Put dataRow) {
         ValueGetter valueGetter = new SimpleValueGetter(dataRow);
         return indexMaintainer.buildRowKey(valueGetter, new 
ImmutableBytesWritable(dataRow.getRow()),
-                null, null, HConstants.LATEST_TIMESTAMP);
+                null, null, getMaxTimestamp(dataRow));
     }
 
     public static long getMaxTimestamp(Mutation m) {
@@ -1188,7 +1200,7 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
         if (indexPut == null) {
             // No covered column. Just prepare an index row with the empty 
column
             byte[] indexRowKey = indexMaintainer.buildRowKey(mergedRowVG, 
rowKeyPtr,
-                    null, null, HConstants.LATEST_TIMESTAMP);
+                    null, null, ts);
             indexPut = new Put(indexRowKey);
         } else {
             removeEmptyColumn(indexPut, 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
index 8a94314..204ec3c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/PhoenixTxIndexMutationGenerator.java
@@ -49,6 +49,7 @@ import org.apache.phoenix.cache.IndexMetaDataCache;
 import org.apache.phoenix.compile.ScanRanges;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.filter.SkipScanFilter;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.MultiMutation;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
@@ -462,7 +463,7 @@ public class PhoenixTxIndexMutationGenerator {
                 throws IOException {
             // TODO: creating these objects over and over again is wasteful
             ColumnTracker tracker = new ColumnTracker(indexedColumns);
-            ValueGetter getter = new ValueGetter() {
+            ValueGetter getter = new AbstractValueGetter() {
 
                 @Override
                 public ImmutableBytesWritable getLatestValue(ColumnReference 
ref, long ts) throws IOException {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java
index 050a046..5d99b03 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/PhoenixRowTimestampFunction.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.expression.function;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.expression.Determinism;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
@@ -28,7 +29,9 @@ import org.apache.phoenix.parse.PhoenixRowTimestampParseNode;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PDataType;
 import org.apache.phoenix.schema.types.PDate;
+import org.apache.phoenix.schema.types.PTimestamp;
 
+import java.sql.Timestamp;
 import java.util.Date;
 import java.util.List;
 
@@ -89,7 +92,7 @@ public class PhoenixRowTimestampFunction extends 
ScalarFunction {
     @Override
     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
 
-        if (tuple == null || tuple.size() == 0) {
+        if (tuple == null) {
             return false;
         }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/AbstractValueGetter.java
similarity index 52%
copy from 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
copy to 
phoenix-core/src/main/java/org/apache/phoenix/hbase/index/AbstractValueGetter.java
index 8c75424..08d19d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/AbstractValueGetter.java
@@ -19,23 +19,27 @@ package org.apache.phoenix.hbase.index;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 
-public interface ValueGetter {
-  public static final ImmutableBytesWritable HIDDEN_BY_DELETE = new 
ImmutableBytesWritable(new byte[0]);
-  /**
-   * Get the most recent (largest timestamp) for the given column reference
-   * @param ref to match against an underlying key value. Uses the passed 
object to match the
-   *          keyValue via {@link ColumnReference#matches}
- * @param ts time stamp at which mutations will be issued
-   * @return the stored value for the given {@link ColumnReference}, 
<tt>null</tt> if no value is
-   *         present, or {@link ValueGetter#HIDDEN_BY_DELETE} if no value is 
present and the ref
-   *         will be shadowed by a delete marker.
-   * @throws IOException if there is an error accessing the underlying data 
storage
-   */
-  public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) 
throws IOException;
-  
-  public byte[] getRowKey();
-
-}
\ No newline at end of file
+public abstract class AbstractValueGetter implements ValueGetter{
+    @Override
+    public KeyValue getLatestKeyValue(ColumnReference ref, long ts) throws 
IOException {
+        ImmutableBytesWritable value = getLatestValue(ref, ts);
+        byte[] rowKey = getRowKey();
+        int valueOffset = 0;
+        int valueLength = 0;
+        byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY;
+        if (value != null) {
+            valueBytes = value.get();
+            valueOffset = value.getOffset();
+            valueLength = value.getLength();
+        }
+        return new KeyValue(rowKey, 0, rowKey.length, ref.getFamily(), 0, 
ref.getFamily().length,
+                ref.getQualifier(), 0, ref.getQualifier().length, ts, 
KeyValue.Type.Put,
+                valueBytes, valueOffset, valueLength);
+    }
+ }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index cb7cf0d..4b341cf 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -769,7 +769,7 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver {
                     if (indexPut == null) {
                         // No covered column. Just prepare an index row with 
the empty column
                         byte[] indexRowKey = 
indexMaintainer.buildRowKey(nextDataRowVG, rowKeyPtr,
-                                null, null, HConstants.LATEST_TIMESTAMP);
+                                null, null, ts);
                         indexPut = new Put(indexRowKey);
                     } else {
                         removeEmptyColumn(indexPut, 
indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(),
@@ -783,7 +783,7 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver {
                     if (currentDataRowState != null) {
                         ValueGetter currentDataRowVG = new 
GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState);
                         byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
-                                null, null, HConstants.LATEST_TIMESTAMP);
+                                null, null, ts);
                         if (Bytes.compareTo(indexPut.getRow(), 
indexRowKeyForCurrentDataRow) != 0) {
                             Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
                                     IndexMaintainer.DeleteType.ALL_VERSIONS, 
ts);
@@ -794,7 +794,7 @@ public class IndexRegionObserver extends 
CompatIndexRegionObserver {
                 } else if (currentDataRowState != null) {
                     ValueGetter currentDataRowVG = new 
GlobalIndexRegionScanner.SimpleValueGetter(currentDataRowState);
                     byte[] indexRowKeyForCurrentDataRow = 
indexMaintainer.buildRowKey(currentDataRowVG, rowKeyPtr,
-                            null, null, HConstants.LATEST_TIMESTAMP);
+                            null, null, ts);
                     Mutation del = 
indexMaintainer.buildRowDeleteMutation(indexRowKeyForCurrentDataRow,
                             IndexMaintainer.DeleteType.ALL_VERSIONS, ts);
                     context.indexUpdates.put(hTableInterfaceReference,
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index 8c75424..7b8763e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -19,6 +19,9 @@ package org.apache.phoenix.hbase.index;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 
@@ -35,6 +38,7 @@ public interface ValueGetter {
    * @throws IOException if there is an error accessing the underlying data 
storage
    */
   public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) 
throws IOException;
+  public KeyValue getLatestKeyValue(ColumnReference ref, long ts) throws 
IOException;
   
   public byte[] getRowKey();
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
index 1049c89..fc2cb53 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LazyValueGetter.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import 
org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter.DeleteTracker;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -36,7 +37,7 @@ import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
  * {@link ValueGetter} that uses lazy initialization to get the value for the 
given
  * {@link ColumnReference}. Once stored, the mapping for that reference is 
retained.
  */
-public class LazyValueGetter implements ValueGetter {
+public class LazyValueGetter extends AbstractValueGetter {
 
     private CoveredDeleteScanner scan;
     private volatile Map<ColumnReference, ImmutableBytesWritable> values;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
index c0bd10f..e1ca183 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java
@@ -68,6 +68,7 @@ import org.apache.phoenix.expression.LiteralExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.SingleCellConstructorExpression;
 import org.apache.phoenix.expression.visitor.KeyValueExpressionVisitor;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -585,8 +586,18 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
                                     cf == null ? 
dataTable.getColumnForColumnQualifier(null, cq)
                                             : dataTable.getColumnFamily(cf)
                                                     
.getPColumnForColumnQualifier(cq);
-                            indexedColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName()
-                                    .getString(), 
dataColumn.getName().getString()));
+                            if (dataColumn == null) {
+                                if (Bytes.compareTo(cf, dataEmptyKeyValueCF) 
== 0
+                                        && Bytes.compareTo(cq, 
EncodedColumnsUtil.getEmptyKeyValueInfo(dataEncodingScheme).getFirst()) == 0) {
+                                    return null;
+                                } else {
+                                    throw new 
ColumnNotFoundException(dataTable.getSchemaName().getString(),
+                                            
dataTable.getTableName().getString(), Bytes.toString(cf), Bytes.toString(cq));
+                                }
+                            } else {
+                                indexedColumnsInfo.add(new 
Pair<>(dataColumn.getFamilyName()
+                                        .getString(), 
dataColumn.getName().getString()));
+                            }
                         } catch (ColumnNotFoundException | 
ColumnFamilyNotFoundException
                                 | AmbiguousColumnException e) {
                             throw new RuntimeException(e);
@@ -1032,7 +1043,10 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
         Put put = null;
         // New row being inserted: add the empty key value
         ImmutableBytesWritable latestValue = null;
-        if (valueGetter==null || (latestValue = 
valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null || latestValue 
== ValueGetter.HIDDEN_BY_DELETE) {
+        if (valueGetter==null ||
+                this.getCoveredColumns().isEmpty() ||
+                (latestValue = 
valueGetter.getLatestValue(indexEmptyKeyValueRef, ts)) == null ||
+                latestValue == ValueGetter.HIDDEN_BY_DELETE) {
             // We need to track whether or not our empty key value is hidden 
by a Delete Family marker at the same timestamp.
             // If it is, these Puts will be masked so should not be emitted.
             if (latestValue == ValueGetter.HIDDEN_BY_DELETE) {
@@ -1953,7 +1967,7 @@ public class IndexMaintainer implements Writable, 
Iterable<ColumnReference> {
             ImmutableBytesPtr value = new 
ImmutableBytesPtr(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength());
             valueMap.put(new ColumnReference(kv.getFamilyArray(), 
kv.getFamilyOffset(), kv.getFamilyLength(), kv.getQualifierArray(), 
kv.getQualifierOffset(), kv.getQualifierLength()), value);
         }
-        return new ValueGetter() {
+        return new AbstractValueGetter() {
             @Override
             public ImmutableBytesWritable getLatestValue(ColumnReference ref, 
long ts) {
                 if(ref.equals(indexEmptyKeyValueRef)) return null;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index e2d9bfa..bf2cf0d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -57,6 +57,7 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.Closeables;
@@ -138,21 +139,7 @@ public class TableResultIterator implements ResultIterator 
{
         this.caches = caches;
         
this.retry=plan.getContext().getConnection().getQueryServices().getProps()
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
-        ScanUtil.setScanAttributesForIndexReadRepair(scan, table, 
plan.getContext().getConnection());
-        ScanUtil.setScanAttributesForPhoenixTTL(scan, table, 
plan.getContext().getConnection());
-        if 
(plan.getContext().getConnection().getQueryServices().getProps().getBoolean(
-                QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB,
-                QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) {
-            long pageSizeMs = 
plan.getContext().getConnection().getQueryServices().getProps()
-                    .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
-            if (pageSizeMs == -1) {
-                // Use the half of the HBase RPC timeout value as the the 
server page size to make sure that the HBase
-                // region server will be able to send a heartbeat message to 
the client before the client times out
-                pageSizeMs = (long) 
(plan.getContext().getConnection().getQueryServices().getProps()
-                        .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
-            }
-            scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, 
Bytes.toBytes(Long.valueOf(pageSizeMs)));
-        }
+        ScanUtil.setScanAttributesForClient(scan, table, 
plan.getContext().getConnection());
     }
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 97f88aa..4f6a9f3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -39,6 +39,7 @@ import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Strings;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLineParser;
 import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
@@ -1066,7 +1067,8 @@ public class IndexTool extends Configured implements Tool 
{
             while (rs.next()) {
                 rs.getCurrentRow().getKey(dataRowKeyPtr);
                 // regionStart/EndKey only needed for local indexes, so we 
pass null
-                byte[] indexRowKey = maintainer.buildRowKey(getter, 
dataRowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP);
+                byte[] indexRowKey = maintainer.buildRowKey(getter, 
dataRowKeyPtr, null, null,
+                        rs.getCurrentRow().getValue(0).getTimestamp());
                 histo.addValue(indexRowKey);
             }
             List<Bucket> buckets = histo.computeBuckets();
@@ -1094,7 +1096,7 @@ public class IndexTool extends Configured implements Tool 
{
         for (String dataCol : dataColNames) {
             rsIndex.put(SchemaUtil.getEscapedFullColumnName(dataCol), i++);
         }
-        ValueGetter getter = new ValueGetter() {
+        return new AbstractValueGetter() {
             final ImmutableBytesWritable valuePtr = new 
ImmutableBytesWritable();
             final ImmutableBytesWritable rowKeyPtr = new 
ImmutableBytesWritable();
 
@@ -1118,7 +1120,6 @@ public class IndexTool extends Configured implements Tool 
{
                 return ByteUtil.copyKeyBytesIfNecessary(rowKeyPtr);
             }
         };
-        return getter;
     }
 
     /**
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java
index c9f0158..34457b3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/PhoenixRowTimestampParseNode.java
@@ -19,6 +19,7 @@
 package org.apache.phoenix.parse;
 
 import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.function.FunctionExpression;
@@ -90,10 +91,8 @@ public class PhoenixRowTimestampParseNode extends 
FunctionParseNode {
             }
         }, emptyColumnFamilyName, emptyColumnName);
         List<Expression> expressionList = Arrays.asList(new Expression[] 
{emptyColumnExpression});
-
-        // Add the empty column to the projection list.
-        // According to PHOENIX-4179 this will then return the timestamp of 
the empty column cell.
-        context.getScan().addColumn(emptyColumnFamilyName, emptyColumnName);
+        
context.getScan().setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME,
 emptyColumnFamilyName);
+        
context.getScan().setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME,
 emptyColumnName);
         return new PhoenixRowTimestampFunction(expressionList);
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 448a8fc..f78deb3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1607,7 +1607,7 @@ public class MetaDataClient {
                 if (expressionIndexCompiler.isAggregate()) {
                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                 }
-                if (expression.getDeterminism() != Determinism.ALWAYS) {
+                if (!(expression.getDeterminism() == Determinism.ALWAYS || 
expression.getDeterminism() == Determinism.PER_ROW)) {
                     throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX).build().buildException();
                 }
                 if (expression.isStateless()) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
index 728b1e0..833e9f9 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tuple/ValueGetterTuple.java
@@ -57,22 +57,17 @@ public class ValueGetterTuple extends BaseTuple {
 
     @Override
     public KeyValue getValue(byte[] family, byte[] qualifier) {
-        ImmutableBytesWritable value = null;
         try {
-            value = valueGetter.getLatestValue(new ColumnReference(family, 
qualifier), ts);
+            KeyValue kv = valueGetter.getLatestKeyValue(new 
ColumnReference(family, qualifier), ts);
+            if (kv != null) {
+                return kv;
+            }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
         byte[] rowKey = valueGetter.getRowKey();
-        int valueOffset = 0;
-        int valueLength = 0;
         byte[] valueBytes = HConstants.EMPTY_BYTE_ARRAY;
-        if (value != null) {
-            valueBytes = value.get();
-            valueOffset = value.getOffset();
-            valueLength = value.getLength();
-        }
-       return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, 
qualifier, 0, qualifier.length, HConstants.LATEST_TIMESTAMP, Type.Put, 
valueBytes, valueOffset, valueLength);
+       return new KeyValue(rowKey, 0, rowKey.length, family, 0, family.length, 
qualifier, 0, qualifier.length, ts, Type.Put, valueBytes, 0, 0);
     }
 
     @Override
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
index ec1a4f2..6fb4add 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java
@@ -86,6 +86,7 @@ import org.apache.phoenix.expression.KeyValueColumnExpression;
 import org.apache.phoenix.expression.RowKeyColumnExpression;
 import org.apache.phoenix.expression.SingleCellColumnExpression;
 import org.apache.phoenix.expression.visitor.RowKeyExpressionVisitor;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
@@ -340,7 +341,7 @@ public class IndexUtil {
                  * updating an existing row.
                  */
                 if (dataMutation instanceof Put) {
-                    ValueGetter valueGetter = new ValueGetter() {
+                    ValueGetter valueGetter = new AbstractValueGetter() {
 
                         @Override
                         public byte[] getRowKey() {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 98caf91..cdc7594 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1170,7 +1170,6 @@ public class ScanUtil {
         if (scan.getAttribute(BaseScannerRegionObserver.VIEW_CONSTANTS) == 
null) {
             BaseQueryPlan.serializeViewConstantsIntoScan(scan, dataTable);
         }
-        addEmptyColumnToScan(scan, emptyCF, emptyCQ);
     }
 
     public static void setScanAttributesForPhoenixTTL(Scan scan, PTable table,
@@ -1233,10 +1232,34 @@ public class ScanUtil {
                         HConstants.EMPTY_BYTE_ARRAY,
                         scan.getStartRow(), scan.getStopRow());
             }
-            addEmptyColumnToScan(scan, emptyColumnFamilyName, emptyColumnName);
         }
     }
 
+    public static void setScanAttributesForClient(Scan scan, PTable table,
+                                                  PhoenixConnection 
phoenixConnection) throws SQLException {
+        setScanAttributesForIndexReadRepair(scan, table, phoenixConnection);
+        setScanAttributesForPhoenixTTL(scan, table, phoenixConnection);
+        byte[] emptyCF = 
scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME);
+        byte[] emptyCQ = 
scan.getAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME);
+        if (emptyCF != null && emptyCQ != null) {
+            addEmptyColumnToScan(scan, emptyCF, emptyCQ);
+        }
+        if (phoenixConnection.getQueryServices().getProps().getBoolean(
+                QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB,
+                QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) {
+            long pageSizeMs = phoenixConnection.getQueryServices().getProps()
+                    .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
+            if (pageSizeMs == -1) {
+                // Use the half of the HBase RPC timeout value as the the 
server page size to make sure that the HBase
+                // region server will be able to send a heartbeat message to 
the client before the client times out
+                pageSizeMs = (long) 
(phoenixConnection.getQueryServices().getProps()
+                        .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
+            }
+            scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, 
Bytes.toBytes(Long.valueOf(pageSizeMs)));
+        }
+
+    }
+
     public static void getDummyResult(byte[] rowKey, List<Cell> result) {
         Cell keyValue =
                 KeyValueUtil.newKeyValue(rowKey, 0,
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
index 0260d66..e0f2ac3 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/index/IndexMaintainerTest.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.end2end.index.IndexTestUtil;
+import org.apache.phoenix.hbase.index.AbstractValueGetter;
 import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
@@ -75,7 +76,7 @@ public class IndexMaintainerTest  extends 
BaseConnectionlessQueryTest {
     }
 
     private static ValueGetter newValueGetter(final byte[] row, final 
Map<ColumnReference, byte[]> valueMap) {
-        return new ValueGetter() {
+        return new AbstractValueGetter() {
 
             @Override
             public ImmutableBytesWritable getLatestValue(ColumnReference ref, 
long ts) {

Reply via email to