This is an automated email from the ASF dual-hosted git repository.

jisaac pushed a commit to branch PHOENIX-6978-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6978-feature by this 
push:
     new ade4d639ce PHOENIX-7022 :- Add new columns TTL and ROW_KEY_PREFIX 
(#1660)
ade4d639ce is described below

commit ade4d639ce680bcfa0821bf78ff2a4192ae5ccde
Author: Lokesh Khurana <khuranalokes...@gmail.com>
AuthorDate: Mon Sep 18 11:56:53 2023 -0700

    PHOENIX-7022 :- Add new columns TTL and ROW_KEY_PREFIX (#1660)
    
    * PHOENIX-7022 :- Add TTL and ROWKEY_PREFIX Column and Use TTL instead of 
PHOENIX_TTL for storing TTL in SYSCAT
    
    Co-authored-by: Lokesh Khurana <lokesh.khur...@salesforce.com>
---
 .../org/apache/phoenix/end2end/CreateTableIT.java  |  10 +-
 .../org/apache/phoenix/end2end/SetPropertyIT.java  |  18 +--
 .../apache/phoenix/end2end/TTLAsPhoenixTTLIT.java  |  56 ++++++--
 .../java/org/apache/phoenix/end2end/ViewTTLIT.java |  16 +--
 .../phoenix/end2end/ViewTTLNotEnabledIT.java       |   2 +-
 .../coprocessor/BaseScannerRegionObserver.java     |   2 +-
 .../phoenix/coprocessor/CompactionScanner.java     |   2 +-
 .../coprocessor/GlobalIndexRegionScanner.java      |   6 +-
 .../coprocessor/IndexRebuildRegionScanner.java     |   3 +-
 .../coprocessor/IndexRepairRegionScanner.java      |   3 +-
 .../phoenix/coprocessor/IndexerRegionScanner.java  |   4 +-
 .../phoenix/coprocessor/MetaDataEndpointImpl.java  |  44 +++++--
 .../phoenix/coprocessor/MetaDataProtocol.java      |   2 +-
 .../coprocessor/PhoenixTTLRegionObserver.java      |   6 +-
 .../phoenix/coprocessor/TTLRegionScanner.java      |   2 +-
 .../UngroupedAggregateRegionObserver.java          |   6 +-
 .../apache/phoenix/exception/SQLExceptionCode.java |   7 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java      |  14 +-
 .../mapreduce/PhoenixTTLDeleteJobMapper.java       |   6 +-
 .../mapreduce/index/IndexScrutinyMapper.java       |   8 +-
 .../util/DefaultPhoenixMultiViewListProvider.java  |   2 +-
 .../mapreduce/util/PhoenixMultiInputUtil.java      |  16 +--
 .../phoenix/mapreduce/util/ViewInfoTracker.java    |   2 +-
 .../phoenix/query/ConnectionQueryServicesImpl.java |  19 ++-
 .../org/apache/phoenix/query/QueryConstants.java   |   5 +-
 .../org/apache/phoenix/schema/DelegateTable.java   |  12 ++
 .../org/apache/phoenix/schema/MetaDataClient.java  | 144 ++++++++++++++-------
 .../java/org/apache/phoenix/schema/PTable.java     |  14 ++
 .../java/org/apache/phoenix/schema/PTableImpl.java |  68 +++++++++-
 .../org/apache/phoenix/schema/TableProperty.java   |  14 +-
 .../schema/tool/SchemaExtractionProcessor.java     |   4 +-
 .../apache/phoenix/schema/transform/Transform.java |   2 +
 .../java/org/apache/phoenix/util/ScanUtil.java     |  18 +--
 phoenix-core/src/main/protobuf/PTable.proto        |   2 +
 .../mapreduce/PhoenixMultiViewReaderTest.java      |   4 +-
 .../java/org/apache/phoenix/util/ScanUtilTest.java |   2 +-
 .../java/org/apache/phoenix/util/TestUtil.java     |   2 +-
 37 files changed, 376 insertions(+), 171 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
index 13bd8732cf..3424146da9 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java
@@ -359,7 +359,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals(86400, columnFamilies[0].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
         assertEquals(86400, conn.unwrap(PhoenixConnection.class).getTable(
-                new PTableKey(null, tableName)).getPhoenixTTL());
+                new PTableKey(null, tableName)).getTTL());
     }
 
     @Test
@@ -418,7 +418,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals("C", columnFamilies[1].getNameAsString());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
         assertEquals(86400, conn.unwrap(PhoenixConnection.class).getTable(
-                new PTableKey(null, tableName)).getPhoenixTTL());
+                new PTableKey(null, tableName)).getTTL());
     }
 
     /**
@@ -447,7 +447,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals(86400, columnFamilies[1].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
         assertEquals(86400, conn.unwrap(PhoenixConnection.class).getTable(
-                new PTableKey(null, tableName)).getPhoenixTTL());
+                new PTableKey(null, tableName)).getTTL());
     }
 
     /**
@@ -528,7 +528,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals(10000, columnFamilies[0].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
         assertEquals(10000, conn.unwrap(PhoenixConnection.class).getTable(
-                new PTableKey(null, tableName)).getPhoenixTTL());
+                new PTableKey(null, tableName)).getTTL());
     }
 
     /**
@@ -553,7 +553,7 @@ public class CreateTableIT extends ParallelStatsDisabledIT {
         assertEquals(10000, columnFamilies[0].getTimeToLive());
         //Check if TTL is stored in SYSCAT as well and we are getting ttl from 
get api in PTable
         assertEquals(10000, conn.unwrap(PhoenixConnection.class).getTable(
-                new PTableKey(null, tableName)).getPhoenixTTL());
+                new PTableKey(null, tableName)).getTTL());
     }
 
     @Test
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
index 7a70eb1b0b..b71cf5c992 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SetPropertyIT.java
@@ -430,7 +430,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
             assertEquals(Boolean.toString(false), 
tableDesc.getValue(TableDescriptorBuilder.COMPACTION_ENABLED));
             //Check if Alter Table TTL also changes TTL stored in SYSCAT with 
phoenix.table.ttl disabled
             assertEquals(1000, conn.unwrap(PhoenixConnection.class).getTable(
-                    new PTableKey(null, dataTableFullName)).getPhoenixTTL());
+                    new PTableKey(null, dataTableFullName)).getTTL());
         }
     }
 
@@ -793,7 +793,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(86400, columnFamilies[0].getTimeToLive());
                 //Check if TTL is stored in SYSCAT as well and we are getting 
ttl from get api in PTable
                 assertEquals(86400, 
conn.unwrap(PhoenixConnection.class).getTable(
-                        new PTableKey(null, 
dataTableFullName)).getPhoenixTTL());
+                        new PTableKey(null, dataTableFullName)).getTTL());
             }
             ddl = "ALTER TABLE " + dataTableFullName + " SET TTL=30";
             conn.createStatement().execute(ddl);
@@ -806,7 +806,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 //Check if Alter Table TTL also changes TTL stored in SYSCAT 
with phoenix.table.ttl disabled
                 assertEquals(30, conn.unwrap(PhoenixConnection.class).getTable(
-                        new PTableKey(null, 
dataTableFullName)).getPhoenixTTL());
+                        new PTableKey(null, dataTableFullName)).getTTL());
             }
         } finally {
             conn.close();
@@ -833,8 +833,8 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 assertEquals(HConstants.FOREVER, 
columnFamilies[0].getTimeToLive());
                 //Check if TTL is stored in SYSCAT as well and we are getting 
ttl from get api in PTable
-                assertEquals(PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, 
conn.unwrap(PhoenixConnection.class).getTable(
-                        new PTableKey(null, 
dataTableFullName)).getPhoenixTTL());
+                assertEquals(PhoenixDatabaseMetaData.TTL_NOT_DEFINED, 
conn.unwrap(PhoenixConnection.class).getTable(
+                        new PTableKey(null, dataTableFullName)).getTTL());
             }
             ddl = "ALTER TABLE " + dataTableFullName + " SET TTL=FOREVER";
             conn.createStatement().execute(ddl);
@@ -846,8 +846,8 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(HConstants.FOREVER, 
columnFamilies[0].getTimeToLive());
                 assertEquals("XYZ", columnFamilies[0].getNameAsString());
                 //Check if Alter Table TTL also changes TTL stored in SYSCAT 
with phoenix.table.ttl disabled
-                assertEquals(HConstants.LATEST_TIMESTAMP, 
conn.unwrap(PhoenixConnection.class).getTable(
-                        new PTableKey(null, 
dataTableFullName)).getPhoenixTTL());
+                assertEquals(HConstants.FOREVER, 
conn.unwrap(PhoenixConnection.class).getTable(
+                        new PTableKey(null, dataTableFullName)).getTTL());
             }
         } finally {
             conn.close();
@@ -996,7 +996,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(86400, columnFamilies[1].getTimeToLive());
                 //Check if TTL is stored in SYSCAT as well and we are getting 
ttl from get api in PTable
                 assertEquals(86400, 
conn.unwrap(PhoenixConnection.class).getTable(
-                        new PTableKey(null, 
dataTableFullName)).getPhoenixTTL());
+                        new PTableKey(null, dataTableFullName)).getTTL());
             }
 
             ddl = "ALTER TABLE " + dataTableFullName + " SET TTL=1000";
@@ -1014,7 +1014,7 @@ public abstract class SetPropertyIT extends 
ParallelStatsDisabledIT {
                 assertEquals(1000, columnFamilies[1].getTimeToLive());
                 //Check if Alter Table TTL also changes TTL stored in SYSCAT 
with phoenix.table.ttl disabled
                 assertEquals(1000, 
conn.unwrap(PhoenixConnection.class).getTable(
-                        new PTableKey(null, 
dataTableFullName)).getPhoenixTTL());
+                        new PTableKey(null, dataTableFullName)).getTTL());
             }
 
             // the new column will be assigned to the column family XYZ. With 
the a KV column getting added for XYZ,
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
index 4a006ab213..9342014a0b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TTLAsPhoenixTTLIT.java
@@ -40,6 +40,7 @@ import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_SET_OR_ALTER_
 import static 
org.apache.phoenix.exception.SQLExceptionCode.PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
 @Category(ParallelStatsDisabledTest.class)
@@ -53,9 +54,11 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
     @Test
     public void testCreateTableWithTTL() throws Exception {
         try (Connection conn = DriverManager.getConnection(getUrl());) {
+            PTable table = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,
+                    createTableWithOrWithOutTTLAsItsProperty(conn, true)));
             assertEquals("TTL is not set correctly at Phoenix level", 
DEFAULT_TTL_FOR_TEST,
-                    conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,
-                            createTableWithOrWithOutTTLAsItsProperty(conn, 
true))).getPhoenixTTL());
+                    table.getTTL());
+            assertNull("RowKeyPrefix should be Null", table.getRowKeyPrefix());
         }
     }
 
@@ -95,13 +98,13 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
         Connection conn = DriverManager.getConnection(getUrl());
         conn.createStatement().execute(ddl);
         assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                HConstants.LATEST_TIMESTAMP, tableName);
+                HConstants.FOREVER, tableName);
 
         ddl = "ALTER TABLE  " + tableName
                 + " SET TTL=NONE";
         conn.createStatement().execute(ddl);
         assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, tableName);
+                PhoenixDatabaseMetaData.TTL_NOT_DEFINED, tableName);
         //Setting TTL should not be stored as CF Descriptor properties when
         //phoenix.table.ttl.enabled is true
         Admin admin = driver.getConnectionQueryServices(getUrl(), new 
Properties()).getAdmin();
@@ -117,13 +120,13 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
                         + " ) TTL=NONE";
         conn.createStatement().execute(ddl);
         assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, tableName);
+                PhoenixDatabaseMetaData.TTL_NOT_DEFINED, tableName);
 
         ddl = "ALTER TABLE  " + tableName
                 + " SET TTL=FOREVER";
         conn.createStatement().execute(ddl);
         assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
-                HConstants.LATEST_TIMESTAMP, tableName);
+                HConstants.FOREVER, tableName);
         //Setting TTL should not be stored as CF Descriptor properties when
         //phoenix.table.ttl.enabled is true
         columnFamilies =
@@ -138,7 +141,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
              PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);){
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
false);
             //Checking Default TTL in case of PhoenixTTLEnabled
-            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, tableName);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class), 
PhoenixDatabaseMetaData.TTL_NOT_DEFINED, tableName);
             String ddl = "ALTER TABLE  " + tableName
                     + " SET TTL=1000";
             conn.createStatement().execute(ddl);
@@ -172,7 +175,8 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             String globalIndexName = 
createIndexOnTableOrViewProvidedWithTTL(conn, tableName, 
PTable.IndexType.GLOBAL, false);
             indexes = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null, tableName)).getIndexes();
             for (PTable index : indexes) {
-                
assertTTLValueOfIndex(PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED, index);
+                assertTTLValueOfIndex(PhoenixDatabaseMetaData.TTL_NOT_DEFINED, 
index);
+                assertNull(index.getRowKeyPrefix());
             }
 
             //Test setting TTL as index property not allowed while creating 
them or setting them explicitly.
@@ -228,11 +232,16 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
 
             String tableName = createTableWithOrWithOutTTLAsItsProperty(conn, 
true);
 
-            //View gets TTL value from its hierarchy
-            String viewName = createViewOnTableWithTTL(conn, tableName, false);
+            //View gets TTL value from its hierarchy only for Updatable Views
+            String viewName = createUpdatableViewOnTableWithTTL(conn, 
tableName, false);
             assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
                     DEFAULT_TTL_FOR_TEST, viewName);
 
+            //View gets TTL value from its hierarchy
+            String viewName1 = createReadOnlyViewOnTableWithTTL(conn, 
tableName, false);
+            assertTTLValueOfTableOrView(conn.unwrap(PhoenixConnection.class),
+                    PhoenixDatabaseMetaData.TTL_NOT_DEFINED, viewName1);
+
             //Index on Global View should get TTL from View.
             createIndexOnTableOrViewProvidedWithTTL(conn, viewName, 
PTable.IndexType.GLOBAL,
                     false);
@@ -256,13 +265,22 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
             //Setting TTL on Views should not be allowed.
 
             try {
-                createViewOnTableWithTTL(conn, tableName, true);
+                createUpdatableViewOnTableWithTTL(conn, tableName, true);
+                fail();
+            } catch (SQLException sqe) {
+                assertEquals("Should fail with TTL supported for tables only",
+                        PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), 
sqe.getErrorCode());
+            }
+
+            try {
+                createReadOnlyViewOnTableWithTTL(conn, tableName, true);
                 fail();
             } catch (SQLException sqe) {
                 assertEquals("Should fail with TTL supported for tables only",
                         PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY.getErrorCode(), 
sqe.getErrorCode());
             }
 
+
             try {
                 conn.createStatement().execute("ALTER VIEW " + viewName + " 
SET TTL = 1000");
                 fail();
@@ -283,11 +301,11 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
 
     private void assertTTLValueOfTableOrView(PhoenixConnection conn, long 
expected, String name) throws SQLException {
         assertEquals("TTL value did not match :-", expected,
-                conn.getTable(new PTableKey(conn.getTenantId(), 
name)).getPhoenixTTL());
+                conn.getTable(new PTableKey(conn.getTenantId(), 
name)).getTTL());
     }
 
     private void assertTTLValueOfIndex(long expected, PTable index) {
-        assertEquals("TTL value is not what expected :-", expected, 
index.getPhoenixTTL());
+        assertEquals("TTL value is not what expected :-", expected, 
index.getTTL());
     }
 
 
@@ -324,7 +342,7 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
         }
     }
 
-    private String createViewOnTableWithTTL(Connection conn, String 
baseTableName,
+    private String createReadOnlyViewOnTableWithTTL(Connection conn, String 
baseTableName,
                                             boolean withTTL) throws 
SQLException {
         String viewName = "VIEW_" + baseTableName + "_" + generateUniqueName();
         conn.createStatement().execute("CREATE VIEW " + viewName
@@ -334,6 +352,16 @@ public class TTLAsPhoenixTTLIT extends 
ParallelStatsDisabledIT{
         return viewName;
     }
 
+    private String createUpdatableViewOnTableWithTTL(Connection conn, String 
baseTableName,
+                                            boolean withTTL) throws 
SQLException {
+        String viewName = "VIEW_" + baseTableName + "_" + generateUniqueName();
+        conn.createStatement().execute("CREATE VIEW " + viewName
+                + " (" + generateUniqueName() + " SMALLINT) as select * from "
+                + baseTableName + " where id = 1 "
+                + (withTTL ? "TTL = 1000" : "") );
+        return viewName;
+    }
+
     private String createViewOnViewWithTTL(Connection conn, String 
parentViewName,
                                            boolean withTTL) throws 
SQLException {
         String childView = parentViewName + "_" + generateUniqueName();
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
index bea751cba6..50cfcf44b7 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLIT.java
@@ -524,7 +524,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                     tenantViewWithOverrideOptions, null);
             fail();
         } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_PHOENIX_TTL.getErrorCode(),
+            
assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_TTL.getErrorCode(),
                     e.getErrorCode());
         }
     }
@@ -573,7 +573,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                 fail();
             }
         } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_PHOENIX_TTL.getErrorCode(),
+            
assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_TTL.getErrorCode(),
                     e.getErrorCode());
         }
     }
@@ -615,7 +615,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
                 fail();
             }
         } catch (SQLException e) {
-            
assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_PHOENIX_TTL.getErrorCode(),
+            
assertEquals(SQLExceptionCode.CANNOT_SET_OR_ALTER_TTL.getErrorCode(),
                     e.getErrorCode());
         }
     }
@@ -1883,7 +1883,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             Assert.assertFalse("Should not have any rows", rs.next());
             Assert.assertEquals("Should have atleast one element", 1, 
queryPlan.getScans().size());
             Assert.assertEquals("PhoenixTTL does not match",
-                    phoenixTTL*1000, 
ScanUtil.getPhoenixTTL(queryPlan.getScans().get(0).get(0)));
+                    phoenixTTL*1000, 
ScanUtil.getTTL(queryPlan.getScans().get(0).get(0)));
             Assert.assertTrue("Masking attribute should be set",
                     
ScanUtil.isMaskTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
             Assert.assertFalse("Delete Expired attribute should not set",
@@ -1913,14 +1913,14 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, 
emptyColumnFamilyName);
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyColumnName);
             
scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, 
PDataType.TRUE_BYTES);
-            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, 
Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+            scan.setAttribute(BaseScannerRegionObserver.TTL, 
Bytes.toBytes(Long.valueOf(table.getTTL())));
 
             PhoenixResultSet
                     rs = pstmt.newResultSet(queryPlan.iterator(), 
queryPlan.getProjector(), queryPlan.getContext());
             Assert.assertFalse("Should not have any rows", rs.next());
             Assert.assertEquals("Should have atleast one element", 1, 
queryPlan.getScans().size());
             Assert.assertEquals("PhoenixTTL does not match",
-                    phoenixTTL*1000, 
ScanUtil.getPhoenixTTL(queryPlan.getScans().get(0).get(0)));
+                    phoenixTTL*1000, 
ScanUtil.getTTL(queryPlan.getScans().get(0).get(0)));
             Assert.assertFalse("Masking attribute should not be set",
                     
ScanUtil.isMaskTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
             Assert.assertTrue("Delete Expired attribute should be set",
@@ -2537,7 +2537,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, 
emptyColumnFamilyName);
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyColumnName);
             
scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, 
PDataType.TRUE_BYTES);
-            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, 
Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+            scan.setAttribute(BaseScannerRegionObserver.TTL, 
Bytes.toBytes(Long.valueOf(table.getTTL())));
 
             PhoenixResultSet
                     rs = pstmt.newResultSet(queryPlan.iterator(), 
queryPlan.getProjector(), queryPlan.getContext());
@@ -2579,7 +2579,7 @@ public class ViewTTLIT extends ParallelStatsDisabledIT {
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, 
emptyColumnFamilyName);
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyColumnName);
             
scan.setAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, 
PDataType.TRUE_BYTES);
-            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, 
Bytes.toBytes(Long.valueOf(table.getPhoenixTTL())));
+            scan.setAttribute(BaseScannerRegionObserver.TTL, 
Bytes.toBytes(Long.valueOf(table.getTTL())));
             PhoenixResultSet
                     rs = pstmt.newResultSet(queryPlan.iterator(), 
queryPlan.getProjector(), queryPlan.getContext());
             while (rs.next());
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java
index a624c60051..6349dd97a3 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewTTLNotEnabledIT.java
@@ -95,7 +95,7 @@ public class ViewTTLNotEnabledIT extends 
ParallelStatsDisabledIT {
             Assert.assertFalse("Should not have any rows", rs.next());
             Assert.assertEquals("Should have at least one element", 1, 
queryPlan.getScans().size());
             Assert.assertEquals("PhoenixTTL should not be set",
-                    0, 
ScanUtil.getPhoenixTTL(queryPlan.getScans().get(0).get(0)));
+                    0, ScanUtil.getTTL(queryPlan.getScans().get(0).get(0)));
             Assert.assertFalse("Masking attribute should not be set",
                     
ScanUtil.isMaskTTLExpiredRows(queryPlan.getScans().get(0).get(0)));
             Assert.assertFalse("Delete Expired attribute should not set",
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index bc566a8364..9b8c874259 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -128,7 +128,7 @@ abstract public class BaseScannerRegionObserver implements 
RegionObserver {
     public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = 
"_RunUpdateStatsAsync";
     public static final String SKIP_REGION_BOUNDARY_CHECK = 
"_SKIP_REGION_BOUNDARY_CHECK";
     public static final String TX_SCN = "_TxScn";
-    public static final String PHOENIX_TTL = "_PhoenixTTL";
+    public static final String TTL = "_TTL";
     public static final String MASK_PHOENIX_TTL_EXPIRED = "_MASK_TTL_EXPIRED";
     public static final String DELETE_PHOENIX_TTL_EXPIRED = 
"_DELETE_TTL_EXPIRED";
     public static final String PHOENIX_TTL_SCAN_TABLE_NAME = 
"_PhoenixTTLScanTableName";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
index cb7dbf57d7..144a5d7b1f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/CompactionScanner.java
@@ -86,7 +86,7 @@ public class CompactionScanner implements InternalScanner {
             long maxLookbackInMillis,
             byte[] emptyCF,
             byte[] emptyCQ,
-            long phoenixTTL,
+            int phoenixTTL,
             boolean isSystemTable) {
         this.storeScanner = storeScanner;
         this.region = env.getRegion();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
index 2caaba777e..fb3f2bace3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java
@@ -154,7 +154,7 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
     protected TaskRunner pool;
     protected String exceptionMessage;
     protected HTableFactory hTableFactory;
-    protected long indexTableTTL;
+    protected int indexTableTTL;
     protected long maxLookBackInMills;
     protected IndexToolVerificationResult verificationResult = null;
     protected IndexVerificationResultRepository verificationResultRepository = 
null;
@@ -274,11 +274,11 @@ public abstract class GlobalIndexRegionScanner extends 
BaseRegionScanner {
         throw new IllegalStateException("No cell found");
     }
 
-    protected static boolean isTimestampBeforeTTL(long tableTTL, long 
currentTime, long tsToCheck) {
+    protected static boolean isTimestampBeforeTTL(int tableTTL, long 
currentTime, long tsToCheck) {
         if (tableTTL == HConstants.FOREVER) {
             return false;
         }
-        return tsToCheck < (currentTime - tableTTL * 1000);
+        return tsToCheck < (currentTime - tableTTL * 1000L);
     }
 
     protected static boolean isTimestampBeyondMaxLookBack(long 
maxLookBackInMills,
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
index 24951662d8..c4311f3ce7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRebuildRegionScanner.java
@@ -34,7 +34,6 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.phoenix.index.IndexMaintainer;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
@@ -90,7 +89,7 @@ public class IndexRebuildRegionScanner extends 
GlobalIndexRegionScanner {
 
         indexHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
         if 
(BaseScannerRegionObserver.isPhoenixTableTTLEnabled(env.getConfiguration())) {
-            indexTableTTL = ScanUtil.getPhoenixTTL(scan);
+            indexTableTTL = ScanUtil.getTTL(scan);
         } else {
             indexTableTTL = 
indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
index fd0e25cd78..46cfbd4641 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexRepairRegionScanner.java
@@ -36,7 +36,6 @@ import java.util.TreeSet;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
@@ -88,7 +87,7 @@ public class IndexRepairRegionScanner extends 
GlobalIndexRegionScanner {
         byte[] dataTableName = scan.getAttribute(PHYSICAL_DATA_TABLE_NAME);
         dataHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(dataTableName));
         if 
(BaseScannerRegionObserver.isPhoenixTableTTLEnabled(env.getConfiguration())) {
-            indexTableTTL = ScanUtil.getPhoenixTTL(scan);
+            indexTableTTL = ScanUtil.getTTL(scan);
         } else {
             indexTableTTL = 
indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
index 92006d7927..4493f92bbc 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java
@@ -27,12 +27,10 @@ import static org.apache.phoenix.util.ScanUtil.isDummy;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -97,7 +95,7 @@ public class IndexerRegionScanner extends 
GlobalIndexRegionScanner {
         super(innerScanner, region, scan, env, 
ungroupedAggregateRegionObserver);
         indexHTable = hTableFactory.getTable(new 
ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
         if 
(BaseScannerRegionObserver.isPhoenixTableTTLEnabled(env.getConfiguration())) {
-            indexTableTTL = ScanUtil.getPhoenixTTL(scan);
+            indexTableTTL = ScanUtil.getTTL(scan);
         } else {
             indexTableTTL = 
indexHTable.getDescriptor().getColumnFamilies()[0].getTimeToLive();
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index 5e234923d1..172056052a 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -58,10 +58,12 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.NUM_ARGS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM_BYTES;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED_DEPRECATED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE_BYTES;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ROW_KEY_PREFIX_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_VERSION_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
@@ -74,6 +76,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTE
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER_BYTES;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_BYTES;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY_BYTES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.USE_STATS_FOR_PARALLELIZATION_BYTES;
@@ -365,7 +368,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
     private static final Cell STREAMING_TOPIC_NAME_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
         TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES);
-
+    private static final Cell TTL_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+            TABLE_FAMILY_BYTES, TTL_BYTES);
+    private static final Cell ROW_KEY_PREFIX_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
+            TABLE_FAMILY_BYTES, ROW_KEY_PREFIX_BYTES);
     private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
             EMPTY_KEYVALUE_KV,
             TABLE_TYPE_KV,
@@ -404,7 +410,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
             CHANGE_DETECTION_ENABLED_KV,
             SCHEMA_VERSION_KV,
             EXTERNAL_SCHEMA_ID_KV,
-            STREAMING_TOPIC_NAME_KV
+            STREAMING_TOPIC_NAME_KV,
+            TTL_KV,
+            ROW_KEY_PREFIX_KV
     );
 
     static {
@@ -452,6 +460,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
     private static final int STREAMING_TOPIC_NAME_INDEX =
         TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
+    private static final int TTL_INDEX = TABLE_KV_COLUMNS.indexOf(TTL_KV);
+    private static final int ROW_KEY_PREFIX_INDEX = 
TABLE_KV_COLUMNS.indexOf(ROW_KEY_PREFIX_KV);
     // KeyValues for Column
     private static final KeyValue DECIMAL_DIGITS_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
DECIMAL_DIGITS_BYTES);
     private static final KeyValue COLUMN_SIZE_KV = 
createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, 
COLUMN_SIZE_BYTES);
@@ -1364,11 +1374,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
              useStatsForParallelization : oldTable != null ? 
oldTable.useStatsForParallelization() : null);
 
         Cell phoenixTTLKv = tableKeyValues[PHOENIX_TTL_INDEX];
-        long phoenixTTL = phoenixTTLKv == null ? PHOENIX_TTL_NOT_DEFINED :
-                
PLong.INSTANCE.getCodec().decodeLong(phoenixTTLKv.getValueArray(),
+        long phoenixTTL = phoenixTTLKv == null ? 
PHOENIX_TTL_NOT_DEFINED_DEPRECATED : PLong.INSTANCE
+                .getCodec().decodeLong(phoenixTTLKv.getValueArray(),
                         phoenixTTLKv.getValueOffset(), SortOrder.getDefault());
-        builder.setPhoenixTTL(phoenixTTLKv != null ? phoenixTTL :
-            oldTable != null ? oldTable.getPhoenixTTL() : 
PHOENIX_TTL_NOT_DEFINED);
+        builder.setPhoenixTTL(phoenixTTLKv != null ? phoenixTTL : oldTable != 
null
+                ? oldTable.getPhoenixTTL() : 
PHOENIX_TTL_NOT_DEFINED_DEPRECATED);
 
         Cell phoenixTTLHWMKv = tableKeyValues[PHOENIX_TTL_HWM_INDEX];
         long phoenixTTLHWM = phoenixTTLHWMKv == null ? MIN_PHOENIX_TTL_HWM :
@@ -1423,6 +1433,22 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
         builder.setStreamingTopicName(streamingTopicName != null ? 
streamingTopicName :
             oldTable != null ? oldTable.getStreamingTopicName() : null);
 
+        Cell ttlKv = tableKeyValues[TTL_INDEX];
+        int ttl = ttlKv == null ? TTL_NOT_DEFINED : PInteger.INSTANCE
+                .getCodec().decodeInt(ttlKv.getValueArray(),
+                        ttlKv.getValueOffset(), SortOrder.getDefault());
+        builder.setTTL(ttlKv != null ? ttl : oldTable != null
+                ? oldTable.getTTL() : TTL_NOT_DEFINED);
+
+        Cell rowKeyPrefixKv = tableKeyValues[ROW_KEY_PREFIX_INDEX];
+        byte[] rowKeyPrefix = rowKeyPrefixKv != null
+                ? (byte[]) 
PVarbinary.INSTANCE.toObject(rowKeyPrefixKv.getValueArray(),
+                        rowKeyPrefixKv.getValueOffset(), 
rowKeyPrefixKv.getValueLength())
+                : null;
+        builder.setRowKeyPrefix(rowKeyPrefix != null ? rowKeyPrefix
+                : oldTable != null ? oldTable.getRowKeyPrefix() : null);
+
+
         // Check the cell tag to see whether the view has modified this 
property
         final byte[] tagUseStatsForParallelization = 
(useStatsForParallelizationKv == null) ?
                 HConstants.EMPTY_BYTE_ARRAY :
@@ -3450,7 +3476,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
                     Cell cell = cells.get(0);
                     long newPhoenixTTL = (long) 
PLong.INSTANCE.toObject(cell.getValueArray(),
                             cell.getValueOffset(), cell.getValueLength());
-                    hasNewPhoenixTTLAttribute =  newPhoenixTTL != 
PHOENIX_TTL_NOT_DEFINED ;
+                    hasNewPhoenixTTLAttribute =  newPhoenixTTL != 
TTL_NOT_DEFINED;
                     //TODO:- Re-enable when we have ViewTTL
                     return false;
                 }
@@ -3459,7 +3485,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES);
 
         if (hasNewPhoenixTTLAttribute) {
             // Disallow if the parent has PHOENIX_TTL set.
-            if (parentTable != null &&  parentTable.getPhoenixTTL() != 
PHOENIX_TTL_NOT_DEFINED) {
+            if (parentTable != null &&  parentTable.getTTL() != 
TTL_NOT_DEFINED) {
                 isSchemaMutationAllowed = false;
             }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
index af6c1cf312..79b6f5f2c7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java
@@ -99,7 +99,7 @@ public abstract class MetaDataProtocol extends 
MetaDataService {
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = 
MIN_TABLE_TIMESTAMP + 29;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = 
MIN_TABLE_TIMESTAMP + 33;
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = 
MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
-    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = 
MIN_TABLE_TIMESTAMP + 37;
+    public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = 
MIN_TABLE_TIMESTAMP + 39;
     // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the 
MIN_SYSTEM_TABLE_TIMESTAMP_* constants
     public static final long MIN_SYSTEM_TABLE_TIMESTAMP = 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0;
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
index ba4d3c226c..90873b7439 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixTTLRegionObserver.java
@@ -93,12 +93,12 @@ public class PhoenixTTLRegionObserver extends 
BaseScannerRegionObserver implemen
         }
         LOG.trace(String.format(
                 "********** PHOENIX-TTL: 
PhoenixTTLRegionObserver::postScannerOpen TTL for table = "
-                        + "[%s], scan = [%s], PHOENIX_TTL = %d 
***************, "
+                        + "[%s], scan = [%s], TTL = %d ***************, "
                         + "numMaskExpiredRequestCount=%d, "
                         + "numDeleteExpiredRequestCount=%d",
                 s.getRegionInfo().getTable().getNameAsString(),
                 scan.toJSON(Integer.MAX_VALUE),
-                ScanUtil.getPhoenixTTL(scan),
+                ScanUtil.getTTL(scan),
                 metricSource.getMaskExpiredRequestCount(),
                 metricSource.getDeleteExpiredRequestCount()
         ));
@@ -284,7 +284,7 @@ public class PhoenixTTLRegionObserver extends 
BaseScannerRegionObserver implemen
 
             boolean isRowExpired = !checkRowNotExpired(cellList);
             if (isRowExpired) {
-                long ttl = ScanUtil.getPhoenixTTL(this.scan);
+                long ttl = ScanUtil.getTTL(this.scan);
                 long ts = ScanUtil.getMaxTimestamp(cellList);
                 LOG.trace(String.format(
                         "PHOENIX-TTL: Deleting row = [%s] belonging to table = 
%s, "
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
index 4942e2b2c3..49bd6f0ae3 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/TTLRegionScanner.java
@@ -75,7 +75,7 @@ public class TTLRegionScanner extends BaseRegionScanner {
                 IS_PHOENIX_TTL_SCAN_TABLE_SYSTEM);
         if (isPhoenixTableTTLEnabled(env.getConfiguration()) && (isSystemTable 
== null
                 || !Bytes.toBoolean(isSystemTable))) {
-            ttl = ScanUtil.getPhoenixTTL(this.scan);
+            ttl = ScanUtil.getTTL(this.scan);
         } else {
             ttl = 
env.getRegion().getTableDescriptor().getColumnFamilies()[0].getTimeToLive();
         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index 6613f91612..425870e2f2 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -19,7 +19,7 @@ package org.apache.phoenix.coprocessor;
 
 import static 
org.apache.phoenix.coprocessor.GlobalIndexRegionScanner.adjustScanFilter;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_PHOENIX_TTL;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
@@ -646,8 +646,8 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                                                     table.getEncodingScheme().
                                                             
encode(QueryConstants.
                                                                     
ENCODED_EMPTY_COLUMN_NAME),
-                                            table.getPhoenixTTL() == 
PHOENIX_TTL_NOT_DEFINED
-                                                    ? DEFAULT_PHOENIX_TTL : 
table.getPhoenixTTL(),
+                                            table.getTTL() == TTL_NOT_DEFINED
+                                                    ? DEFAULT_PHOENIX_TTL : 
table.getTTL(),
                                             table.getType() == 
PTableType.SYSTEM
                                             );
                         }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 9d8e2024fa..8a9f7f45ac 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -341,15 +341,18 @@ public enum SQLExceptionCode {
             + 
MetaDataUtil.SYNCED_DATA_TABLE_AND_INDEX_COL_FAM_PROPERTIES.toString()),
     CANNOT_SET_OR_ALTER_UPDATE_CACHE_FREQ_FOR_INDEX(10950, "44A31", "Cannot 
set or alter "
             + PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY + " on an index"),
+    @Deprecated
     PHOENIX_TTL_SUPPORTED_FOR_VIEWS_ONLY(10951, "44A32", 
PhoenixDatabaseMetaData.PHOENIX_TTL
             + " property can only be set for views"),
+    @Deprecated
     CANNOT_SET_OR_ALTER_PHOENIX_TTL_FOR_TABLE_WITH_TTL(10952, "44A33", "Cannot 
set or alter "
             + PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an table 
with TTL,"),
     ABOVE_INDEX_NON_ASYNC_THRESHOLD(1097, "44A34", "The estimated read size 
for index creation "
             + "is higher than " + QueryServices.CLIENT_INDEX_ASYNC_THRESHOLD+ 
". You can edit the"
             + " limit or create ASYNC index."),
-    CANNOT_SET_OR_ALTER_PHOENIX_TTL(10953, "44A35", "Cannot set or alter "
-            + PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an view when 
parent/child view has PHOENIX_TTL set,"),
+    CANNOT_SET_OR_ALTER_TTL(10953, "44A35", "Cannot set or alter "
+            + PhoenixDatabaseMetaData.TTL + " property on an view when 
parent/child "
+            + "view has TTL set,"),
     CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36",
         CHANGE_DETECTION_ENABLED + " is only supported on tables and views"),
     PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY(10955, "44A37", TTL
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
index dc0fa85ea1..60e8d1300c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java
@@ -392,10 +392,15 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final byte[] USE_STATS_FOR_PARALLELIZATION_BYTES = 
Bytes.toBytes(USE_STATS_FOR_PARALLELIZATION);
 
     // The TTL property will hold the duration after which rows will be marked 
as expired and
-    // is stored in column PHOENIX_TTL in SYSCAT. TODO:- Rename PHOENIX_TTL to 
TTL in SYSCAT!?
+    // is stored in column TTL in SYSCAT
     public static final String TTL = "TTL";
-    public static final long PHOENIX_TTL_NOT_DEFINED = 0L;
-    public static final long DEFAULT_PHOENIX_TTL = HConstants.FOREVER;
+    public static final byte[] TTL_BYTES = Bytes.toBytes(TTL);
+    public static final int TTL_NOT_DEFINED = 0;
+    @Deprecated
+    public static final long PHOENIX_TTL_NOT_DEFINED_DEPRECATED = 0L;
+    //Should we change name of Default value as well!?
+    public static final int DEFAULT_PHOENIX_TTL = HConstants.FOREVER;
+
     public static final String PHOENIX_TTL = "PHOENIX_TTL";
     public static final byte[] PHOENIX_TTL_BYTES = Bytes.toBytes(PHOENIX_TTL);
     // The phoenix ttl high watermark if set indicates the timestamp used for 
determining the expired rows.
@@ -420,6 +425,9 @@ public class PhoenixDatabaseMetaData implements 
DatabaseMetaData {
     public static final String STREAMING_TOPIC_NAME = "STREAMING_TOPIC_NAME";
     public static final byte[] STREAMING_TOPIC_NAME_BYTES = 
Bytes.toBytes(STREAMING_TOPIC_NAME);
 
+    public static final String ROW_KEY_PREFIX = "ROW_KEY_PREFIX";
+    public static final byte[] ROW_KEY_PREFIX_BYTES = 
Bytes.toBytes(ROW_KEY_PREFIX);
+
     public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
     public static final String SYSTEM_CHILD_LINK_NAME = 
SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
     public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = 
Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
index 24b9c220f4..c9580ef4ff 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixTTLDeleteJobMapper.java
@@ -86,7 +86,7 @@ public class PhoenixTTLDeleteJobMapper extends 
Mapper<NullWritable, ViewInfoTrac
             }
 
             LOGGER.debug(String.format("Deleting from view %s, TenantID %s, 
and TTL value: %d",
-                    value.getViewName(), value.getTenantId(), 
value.getPhoenixTtl()));
+                    value.getViewName(), value.getTenantId(), value.getTTL()));
 
             deleteExpiredRows(value, config, context);
 
@@ -158,8 +158,8 @@ public class PhoenixTTLDeleteJobMapper extends 
Mapper<NullWritable, ViewInfoTrac
             scan.setAttribute(
                     BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED, 
PDataType.TRUE_BYTES);
             scan.setAttribute(
-                    BaseScannerRegionObserver.PHOENIX_TTL,
-                    Bytes.toBytes(viewInfoTracker.getPhoenixTtl()));
+                    BaseScannerRegionObserver.TTL,
+                    Bytes.toBytes(viewInfoTracker.getTTL()));
             scan.setAttribute(
                     BaseScannerRegionObserver.PHOENIX_TTL_SCAN_TABLE_NAME,
                     Bytes.toBytes(sourceTableName));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
index 61ffb5ae3f..c3cf756f13 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexScrutinyMapper.java
@@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.phoenix.thirdparty.com.google.common.base.Joiner;
 
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_PHOENIX_TTL;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 
 /**
  * Mapper that reads from the data table and checks the rows against the index 
table
@@ -328,7 +328,7 @@ public class IndexScrutinyMapper extends 
Mapper<NullWritable, PhoenixIndexDBWrit
         return sourceTS <= maxLookBackTimeMillis;
     }
 
-    private long getTableTTL(Configuration configuration) throws SQLException, 
IOException {
+    private int getTableTTL(Configuration configuration) throws SQLException, 
IOException {
         PTable pSourceTable = PhoenixRuntime.getTable(connection, 
qSourceTable);
         if (pSourceTable.getType() == PTableType.INDEX
                 && pSourceTable.getIndexType() == PTable.IndexType.LOCAL) {
@@ -340,8 +340,8 @@ public class IndexScrutinyMapper extends 
Mapper<NullWritable, PhoenixIndexDBWrit
                 SchemaUtil.isNamespaceMappingEnabled(null, cqsi.getProps()));
         if (configuration.getBoolean(QueryServices.PHOENIX_TABLE_TTL_ENABLED,
                 QueryServicesOptions.DEFAULT_PHOENIX_TABLE_TTL_ENABLED)) {
-            return pSourceTable.getPhoenixTTL() == PHOENIX_TTL_NOT_DEFINED ? 
DEFAULT_PHOENIX_TTL
-                    : pSourceTable.getPhoenixTTL();
+            return pSourceTable.getTTL() == TTL_NOT_DEFINED ? 
DEFAULT_PHOENIX_TTL
+                    : pSourceTable.getTTL();
         } else {
             TableDescriptor tableDesc;
             try (Admin admin = cqsi.getAdmin()) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
index a9bd79e004..70667929b7 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/DefaultPhoenixMultiViewListProvider.java
@@ -139,7 +139,7 @@ public class DefaultPhoenixMultiViewListProvider implements 
PhoenixMultiViewList
             PTable parentTable = PhoenixRuntime.getTable(connection, null,
                     pTable.getParentName().toString());
             if (parentTable.getType() == PTableType.VIEW &&
-                    parentTable.getPhoenixTTL() > 0) {
+                    parentTable.getTTL() > 0) {
                             /* if the current view parent already has a TTL 
value, we want to
                             skip the current view cleanup job because we want 
to run the cleanup
                              job for at the GlobalView level instead of 
running multi-jobs at
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
index d25e0e3156..8c28a8d2bb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMultiInputUtil.java
@@ -31,20 +31,20 @@ import java.sql.SQLException;
 import java.util.Properties;
 
 
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_TYPE;
-import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
 
 public class PhoenixMultiInputUtil {
     public static final String SELECT_ALL_VIEW_METADATA_FROM_SYSCAT_QUERY =
-            "SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, PHOENIX_TTL FROM " +
-                    SYSTEM_CATALOG_NAME + " WHERE " +
-                    TABLE_TYPE + " = '" + PTableType.VIEW.getSerializedValue() 
+ "' AND " +
-                    PHOENIX_TTL + " IS NOT NULL AND " +
-                    PHOENIX_TTL + " > " + PHOENIX_TTL_NOT_DEFINED + " AND " +
-                    VIEW_TYPE + " <> " + 
PTable.ViewType.MAPPED.getSerializedValue();
+            "SELECT TENANT_ID, TABLE_SCHEM, TABLE_NAME, TTL FROM "
+                    + SYSTEM_CATALOG_NAME + " WHERE "
+                    + TABLE_TYPE + " = '" + 
PTableType.VIEW.getSerializedValue() + "' AND "
+                    + TTL + " IS NOT NULL AND "
+                    + TTL + " > " + TTL_NOT_DEFINED + " AND "
+                    + VIEW_TYPE + " <> " + 
PTable.ViewType.MAPPED.getSerializedValue();
 
     public static Connection buildTenantConnection(String url, String tenantId)
             throws SQLException {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
index a4f3226b61..c46434606d 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ViewInfoTracker.java
@@ -70,7 +70,7 @@ public class ViewInfoTracker implements ViewInfoWritable {
         return this.isIndexRelation;
     }
 
-    public long getPhoenixTtl() {
+    public long getTTL() {
         return phoenixTtl;
     }
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 9373f4b74b..ee1da7141b 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -4122,19 +4122,30 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
         if (currentServerSideTableTimeStamp < 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) {
             metaConnection = addColumnsIfNotExists(metaConnection,
-                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 5,
                     PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
                             + " " + PVarchar.INSTANCE.getSqlTypeName());
 
             metaConnection = addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 4,
                     PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + 
PVarchar.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 3,
                 PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + 
PVarchar.INSTANCE.getSqlTypeName());
             metaConnection = addColumnsIfNotExists(metaConnection, 
PhoenixDatabaseMetaData.SYSTEM_CATALOG,
-                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
+                MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 2,
                 PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + 
PVarchar.INSTANCE.getSqlTypeName());
+            /**
+             * TODO: Provide a path to copy existing data from PHOENIX_TTL to 
TTL column and then
+             * to DROP PHOENIX_TTL Column. See PHOENIX-7023
+             */
+            metaConnection = addColumnsIfNotExists(metaConnection,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 1,
+                    PhoenixDatabaseMetaData.TTL + " " + 
PInteger.INSTANCE.getSqlTypeName());
+            metaConnection = addColumnsIfNotExists(metaConnection,
+                    PhoenixDatabaseMetaData.SYSTEM_CATALOG, 
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
+                    PhoenixDatabaseMetaData.ROW_KEY_PREFIX + " "
+                            + PVarbinary.INSTANCE.getSqlTypeName());
             UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
         }
         return metaConnection;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
index 52326c189d..4ebcf87e08 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java
@@ -118,6 +118,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.QUERY_STATUS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REF_GENERATION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.REMARKS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.RETURN_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ROW_KEY_PREFIX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCAN_METRICS_JSON;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SCHEMA_VERSION;
@@ -166,6 +167,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_START_TS
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_STATUS;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_TABLE_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSFORM_TYPE;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_FOR_MUTEX;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TYPE_NAME;
@@ -178,7 +180,6 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE;
-
 /**
  *
  * Constants used during querying
@@ -342,6 +343,8 @@ public interface QueryConstants {
             SCHEMA_VERSION + " VARCHAR, \n" +
             EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
             STREAMING_TOPIC_NAME + " VARCHAR, \n" +
+            TTL + " INTEGER," +
+            ROW_KEY_PREFIX + " VARBINARY," +
             // Column metadata (will be null for table row)
             DATA_TYPE + " INTEGER," +
             COLUMN_SIZE + " INTEGER," +
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
index b79691b335..8fe426181b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java
@@ -364,12 +364,19 @@ public class DelegateTable implements PTable {
         return delegate.hasViewModifiedUseStatsForParallelization();
     }
 
+    @Override public int getTTL() {
+        return delegate.getTTL();
+    }
+
+    @Deprecated
     @Override public long getPhoenixTTL() { return delegate.getPhoenixTTL(); }
 
+    @Deprecated
     @Override public long getPhoenixTTLHighWaterMark() {
         return delegate.getPhoenixTTLHighWaterMark();
     }
 
+    @Deprecated
     @Override public boolean hasViewModifiedPhoenixTTL() {
         return delegate.hasViewModifiedPhoenixTTL();
     }
@@ -397,6 +404,11 @@ public class DelegateTable implements PTable {
     @Override
     public String getStreamingTopicName() { return 
delegate.getStreamingTopicName(); }
 
+    @Override
+    public byte[] getRowKeyPrefix() {
+        return delegate.getRowKeyPrefix();
+    }
+
     @Override public Map<String, String> getPropertyValues() { return 
delegate.getPropertyValues(); }
 
     @Override public Map<String, String> getDefaultPropertyValues() { return 
delegate.getDefaultPropertyValues(); }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 709b211b00..bd1da177ca 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -19,12 +19,13 @@ package org.apache.phoenix.schema;
 
 import static 
org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ROW_KEY_PREFIX;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME;
 import static 
org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE;
-import static org.apache.phoenix.schema.PTableType.SYSTEM;
 import static 
org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSet;
 import static 
org.apache.phoenix.thirdparty.com.google.common.collect.Sets.newLinkedHashSetWithExpectedSize;
 import static 
org.apache.phoenix.coprocessor.BaseScannerRegionObserver.RUN_UPDATE_STATS_ASYNC_ATTRIB;
@@ -80,7 +81,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.ORDINAL_POSITION;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PARENT_TENANT_ID;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_HWM;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED_DEPRECATED;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_NAME;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PK_NAME;
@@ -346,9 +347,11 @@ public class MetaDataClient {
                     CHANGE_DETECTION_ENABLED + "," +
                     PHYSICAL_TABLE_NAME + "," +
                     SCHEMA_VERSION + "," +
-                    STREAMING_TOPIC_NAME +
+                    STREAMING_TOPIC_NAME + "," +
+                    TTL + "," +
+                    ROW_KEY_PREFIX +
                     ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, " +
-                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
+                "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
     private static final String CREATE_SCHEMA = "UPSERT INTO " + 
SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE
             + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)";
@@ -1060,7 +1063,7 @@ public class MetaDataClient {
                             .buildException();
                 }
                 //Keeping TTL value as Phoenix Table property irrespective of 
PhoenixTTLEnabled or
-                //not to store the value in SYSCAT. To keep 
PTableImpl.getPhoenixTTL() consistent
+                //not to store the value in SYSCAT. To keep 
PTableImpl.getTTL() consistent
                 //for client side.
                 if (prop.getFirst().equalsIgnoreCase(TTL) && tableType != 
PTableType.SYSTEM) {
                     tableProps.put(prop.getFirst(), prop.getSecond());
@@ -1951,6 +1954,7 @@ public class MetaDataClient {
     }
 
     /**
+     * TODO:- Change this to store TTL as only one level where it is defined!
      * Get TTL defined for Index or View in its parent hierarchy as defining 
TTL directly on index
      * or view is not allowed. View on SYSTEM table is not allowed and already 
handled during
      * plan compilation.
@@ -1958,8 +1962,8 @@ public class MetaDataClient {
      * @return appropriate TTL for the entity calling the function.
      * @throws TableNotFoundException
      */
-    private Long getTTLFromParent(PTable parent) throws TableNotFoundException 
{
-        return (parent.getType() == TABLE) ? 
Long.valueOf(parent.getPhoenixTTL())
+    private Integer getTTLFromParent(PTable parent) throws 
TableNotFoundException {
+        return (parent.getType() == TABLE) ? Integer.valueOf(parent.getTTL())
                 : (parent.getType() == VIEW ? getTTLFromAncestor(parent) : 
null);
     }
 
@@ -1969,12 +1973,12 @@ public class MetaDataClient {
      * @return appropriate TTL from Views defined above for the entity calling.
      * @throws TableNotFoundException
      */
-    private Long getTTLFromAncestor(PTable view) throws TableNotFoundException 
{
+    private Integer getTTLFromAncestor(PTable view) throws 
TableNotFoundException {
         try {
-            return view.getPhoenixTTL() != PHOENIX_TTL_NOT_DEFINED
-                    ? Long.valueOf(view.getPhoenixTTL()) : 
(checkIfParentIsTable(view)
+            return view.getTTL() != TTL_NOT_DEFINED
+                    ? Integer.valueOf(view.getTTL()) : 
(checkIfParentIsTable(view)
                     ? connection.getTable(new PTableKey(null,
-                            
view.getPhysicalNames().get(0).getString())).getPhoenixTTL()
+                            
view.getPhysicalNames().get(0).getString())).getTTL()
                     : getTTLFromAncestor(connection.getTable(new PTableKey(
                                     connection.getTenantId(), 
view.getParentName().getString()))));
         } catch (TableNotFoundException tne) {
@@ -2048,9 +2052,10 @@ public class MetaDataClient {
                     tableType == PTableType.VIEW ? parent.getColumns().size()
                             : QueryConstants.BASE_TABLE_BASE_COLUMN_COUNT;
 
-            Long phoenixTTL = PHOENIX_TTL_NOT_DEFINED;
+            Integer phoenixTTL = TTL_NOT_DEFINED;
             Long phoenixTTLHighWaterMark = MIN_PHOENIX_TTL_HWM;
-            Long phoenixTTLProp = (Long) 
TableProperty.TTL.getValue(tableProps);
+            Integer phoenixTTLProp = (Integer) 
TableProperty.TTL.getValue(tableProps);
+            byte[] rowKeyPrefix = null;
 
             // Validate PHOENIX_TTL prop value if set
             if (phoenixTTLProp != null) {
@@ -2070,7 +2075,7 @@ public class MetaDataClient {
                             .build()
                             .buildException();
                 }
-                //Set Only in case of Tables and System Tables.
+                //Set Only in case of Tables.
                 phoenixTTL = phoenixTTLProp;
             }
 
@@ -2405,7 +2410,9 @@ public class MetaDataClient {
                         updateCacheFrequency = 
parent.getUpdateCacheFrequency();
                     }
 
-                    phoenixTTL = getTTLFromParent(parent);
+                    if (viewType == ViewType.UPDATABLE) {
+                        phoenixTTL = getTTLFromParent(parent);
+                    }
 
                     disableWAL = (disableWALProp == null ? 
parent.isWALDisabled() : disableWALProp);
                     defaultFamilyName = parent.getDefaultFamilyName() == null 
? null : parent.getDefaultFamilyName().getString();
@@ -2862,8 +2869,9 @@ public class MetaDataClient {
                         .setIndexes(Collections.<PTable>emptyList())
                         .setPhysicalNames(ImmutableList.<PName>of())
                         .setColumns(columns.values())
-                        .setPhoenixTTL(PHOENIX_TTL_NOT_DEFINED)
+                        .setPhoenixTTL(PHOENIX_TTL_NOT_DEFINED_DEPRECATED)
                         .setPhoenixTTLHighWaterMark(MIN_PHOENIX_TTL_HWM)
+                        .setTTL(TTL_NOT_DEFINED)
                         .build();
                 connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
@@ -3084,15 +3092,10 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setNull(29, Types.NULL);
             }
-
-            if (phoenixTTL == null || phoenixTTL == PHOENIX_TTL_NOT_DEFINED) {
-                tableUpsert.setNull(30, Types.BIGINT);
-                tableUpsert.setNull(31, Types.BIGINT);
-            }
-            else {
-                tableUpsert.setLong(30, phoenixTTL);
-                tableUpsert.setLong(31, phoenixTTLHighWaterMark);
-            }
+            //Setting Null for PHOENIX_TTL and PHOENIX_TTL_HWM as they will be 
removed later and
+            //are not in use
+            tableUpsert.setNull(30, Types.BIGINT);
+            tableUpsert.setNull(31, Types.BIGINT);
 
             if (isChangeDetectionEnabledProp == null) {
                 tableUpsert.setNull(32, Types.BOOLEAN);
@@ -3117,6 +3120,20 @@ public class MetaDataClient {
             } else {
                 tableUpsert.setString(35, streamingTopicName);
             }
+
+            if (phoenixTTL == null || phoenixTTL == TTL_NOT_DEFINED) {
+                tableUpsert.setNull(36, Types.INTEGER);
+            } else {
+                tableUpsert.setInt(36, phoenixTTL);
+            }
+
+            if (rowKeyPrefix == null) {
+                tableUpsert.setNull(37, Types.VARBINARY);
+            } else {
+                //Need to update are we have Prefix Builder in place.
+                tableUpsert.setNull(37, Types.VARBINARY);
+            }
+
             tableUpsert.execute();
 
             if (asyncCreatedDate != null) {
@@ -3245,8 +3262,8 @@ public class MetaDataClient {
                         .setParentTableName((parent == null) ? null : 
parent.getTableName())
                         .setPhysicalNames(ImmutableList.copyOf(physicalNames))
                         .setColumns(columns.values())
-                        .setPhoenixTTL(phoenixTTL == null ? 
PHOENIX_TTL_NOT_DEFINED : phoenixTTL)
-                        .setPhoenixTTLHighWaterMark(phoenixTTLHighWaterMark == 
null ? MIN_PHOENIX_TTL_HWM : phoenixTTLHighWaterMark)
+                        .setPhoenixTTL(PHOENIX_TTL_NOT_DEFINED_DEPRECATED)
+                        .setPhoenixTTLHighWaterMark(MIN_PHOENIX_TTL_HWM)
                         .setViewModifiedUpdateCacheFrequency(tableType == 
PTableType.VIEW &&
                                 parent != null &&
                                 parent.getUpdateCacheFrequency() != 
updateCacheFrequency)
@@ -3256,7 +3273,7 @@ public class MetaDataClient {
                                         != useStatsForParallelizationProp)
                         .setViewModifiedPhoenixTTL(tableType == 
PTableType.VIEW &&
                                 parent != null && phoenixTTL != null &&
-                                parent.getPhoenixTTL() != phoenixTTL)
+                                parent.getTTL() != phoenixTTL)
                         .setLastDDLTimestamp(result.getTable() != null ?
                                 result.getTable().getLastDDLTimestamp() : null)
                         
.setIsChangeDetectionEnabled(isChangeDetectionEnabledProp)
@@ -3264,6 +3281,8 @@ public class MetaDataClient {
                         .setExternalSchemaId(result.getTable() != null ?
                         result.getTable().getExternalSchemaId() : null)
                         .setStreamingTopicName(streamingTopicName)
+                        .setTTL(phoenixTTL == null ? TTL_NOT_DEFINED : 
phoenixTTL)
+                        .setRowKeyPrefix(rowKeyPrefix)
                         .build();
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
                 addTableToCache(result);
@@ -3668,7 +3687,7 @@ public class MetaDataClient {
             
.setSchemaName(schemaName).setTableName(tableName).setFamilyName(familyName).setColumnName(columnName).setMessage(msg).build().buildException();
         case UNALLOWED_SCHEMA_MUTATION:
             throw new SQLExceptionInfo.Builder(
-                    SQLExceptionCode.CANNOT_SET_OR_ALTER_PHOENIX_TTL)
+                    SQLExceptionCode.CANNOT_SET_OR_ALTER_TTL)
                     
.setSchemaName(schemaName).setTableName(tableName).build().buildException();
         case NO_OP:
         case COLUMN_ALREADY_EXISTS:
@@ -3716,7 +3735,7 @@ public class MetaDataClient {
                 metaPropertiesEvaluated.getAppendOnlySchema(),
                 metaPropertiesEvaluated.getImmutableStorageScheme(),
                 metaPropertiesEvaluated.getUseStatsForParallelization(),
-                metaPropertiesEvaluated.getPhoenixTTL(),
+                metaPropertiesEvaluated.getTTL(),
                 metaPropertiesEvaluated.isChangeDetectionEnabled(),
                 metaPropertiesEvaluated.getPhysicalTableName(),
                 metaPropertiesEvaluated.getSchemaVersion(),
@@ -3725,10 +3744,10 @@ public class MetaDataClient {
     }
 
     private  long incrementTableSeqNum(PTable table, PTableType expectedType, 
int columnCountDelta, Boolean isTransactional,
-                                       Long updateCacheFrequency, Long 
phoenixTTL, String physicalTableName,
+                                       Long updateCacheFrequency, Integer ttl, 
String physicalTableName,
                                        String schemaVersion, 
QualifierEncodingScheme columnEncodedBytes) throws SQLException {
         return incrementTableSeqNum(table, expectedType, columnCountDelta, 
isTransactional, null,
-            updateCacheFrequency, null, null, null, null, -1L, null, null, 
null,phoenixTTL, false, physicalTableName,
+            updateCacheFrequency, null, null, null, null, -1L, null, null, 
null, ttl, false, physicalTableName,
                 schemaVersion, columnEncodedBytes, null);
     }
 
@@ -3737,7 +3756,7 @@ public class MetaDataClient {
             Long updateCacheFrequency, Boolean isImmutableRows, Boolean 
disableWAL,
             Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, 
Boolean appendOnlySchema,
             ImmutableStorageScheme immutableStorageScheme, Boolean 
useStatsForParallelization,
-            Long phoenixTTL, Boolean isChangeDetectionEnabled, String 
physicalTableName, String schemaVersion,
+            Integer ttl, Boolean isChangeDetectionEnabled, String 
physicalTableName, String schemaVersion,
                                       QualifierEncodingScheme 
columnEncodedBytes, String streamingTopicName)
             throws SQLException {
         String schemaName = table.getSchemaName().getString();
@@ -3794,8 +3813,8 @@ public class MetaDataClient {
         if (useStatsForParallelization != null) {
             mutateBooleanProperty(connection, tenantId, schemaName, tableName, 
USE_STATS_FOR_PARALLELIZATION, useStatsForParallelization);
         }
-        if (phoenixTTL != null) {
-            mutateLongProperty(connection, tenantId, schemaName, tableName, 
PHOENIX_TTL, phoenixTTL);
+        if (ttl != null) {
+            mutateIntegerProperty(connection, tenantId, schemaName, tableName, 
TTL, ttl);
         }
         if (isChangeDetectionEnabled != null) {
             mutateBooleanProperty(connection, tenantId, schemaName, tableName, 
CHANGE_DETECTION_ENABLED, isChangeDetectionEnabled);
@@ -3864,6 +3883,29 @@ public class MetaDataClient {
             tableBoolUpsert.execute();
         }
     }
+
+    private static void mutateIntegerProperty(Connection connection, String 
tenantId,
+            String schemaName, String tableName, String propertyName, Integer 
propertyValue)
+            throws SQLException {
+        String updatePropertySql = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + 
".\""
+                + SYSTEM_CATALOG_TABLE + "\"( "
+                + TENANT_ID + ","
+                + TABLE_SCHEM + ","
+                + TABLE_NAME + ","
+                + propertyName
+                + ") VALUES (?, ?, ?, ?)";
+        try (PreparedStatement tableBoolUpsert = 
connection.prepareStatement(updatePropertySql)) {
+            tableBoolUpsert.setString(1, tenantId);
+            tableBoolUpsert.setString(2, schemaName);
+            tableBoolUpsert.setString(3, tableName);
+            if (propertyValue == null) {
+                tableBoolUpsert.setNull(4, Types.INTEGER);
+            } else {
+                tableBoolUpsert.setInt(4, propertyValue);
+            }
+            tableBoolUpsert.execute();
+        }
+    }
     
     private static void mutateByteProperty(Connection connection, String 
tenantId, String schemaName, String tableName,
             String propertyName, Byte propertyValue) throws SQLException {
@@ -4184,12 +4226,12 @@ public class MetaDataClient {
 
                 if (!table.getIndexes().isEmpty() &&
                         (numPkColumnsAdded>0 || metaProperties.getNonTxToTx() 
||
-                                
metaPropertiesEvaluated.getUpdateCacheFrequency() != null || 
metaPropertiesEvaluated.getPhoenixTTL() != null)) {
+                                
metaPropertiesEvaluated.getUpdateCacheFrequency() != null || 
metaPropertiesEvaluated.getTTL() != null)) {
                     for (PTable index : table.getIndexes()) {
                         incrementTableSeqNum(index, index.getType(), 
numPkColumnsAdded,
                                 metaProperties.getNonTxToTx() ? Boolean.TRUE : 
null,
                                 
metaPropertiesEvaluated.getUpdateCacheFrequency(),
-                                metaPropertiesEvaluated.getPhoenixTTL(),
+                                metaPropertiesEvaluated.getTTL(),
                                 metaPropertiesEvaluated.getPhysicalTableName(),
                                 metaPropertiesEvaluated.getSchemaVersion(),
                                 metaProperties.getColumnEncodedBytesProp());
@@ -4203,7 +4245,7 @@ public class MetaDataClient {
                         incrementTableSeqNum(index, index.getType(), 
columnDefs.size(),
                                 Boolean.FALSE,
                                 
metaPropertiesEvaluated.getUpdateCacheFrequency(),
-                                metaPropertiesEvaluated.getPhoenixTTL(),
+                                metaPropertiesEvaluated.getTTL(),
                                 metaPropertiesEvaluated.getPhysicalTableName(),
                                 metaPropertiesEvaluated.getSchemaVersion(),
                                 
metaPropertiesEvaluated.getColumnEncodedBytes());
@@ -5307,7 +5349,7 @@ public class MetaDataClient {
                     } else if 
(propName.equalsIgnoreCase(USE_STATS_FOR_PARALLELIZATION)) {
                         
metaProperties.setUseStatsForParallelizationProp((Boolean)value);
                     } else if (propName.equalsIgnoreCase(TTL)) {
-                        metaProperties.setPhoenixTTL((Long)value);
+                        metaProperties.setTTL((Integer) value);
                     } else if 
(propName.equalsIgnoreCase(CHANGE_DETECTION_ENABLED)) {
                         metaProperties.setChangeDetectionEnabled((Boolean) 
value);
                     } else if (propName.equalsIgnoreCase(PHYSICAL_TABLE_NAME)) 
{
@@ -5480,21 +5522,21 @@ public class MetaDataClient {
             }
         }
 
-        if (metaProperties.getPhoenixTTL() != null) {
+        if (metaProperties.getTTL() != null) {
             if (table.getType() == PTableType.INDEX) {
                 throw new SQLExceptionInfo.Builder(
                         
SQLExceptionCode.CANNOT_SET_OR_ALTER_PROPERTY_FOR_INDEX)
                         .build()
                         .buildException();
             }
-            if (table.getType() != PTableType.TABLE && table.getType() != 
SYSTEM) {
+            if (table.getType() != PTableType.TABLE) {
                 throw new SQLExceptionInfo.Builder(
                         SQLExceptionCode.PHOENIX_TTL_SUPPORTED_FOR_TABLES_ONLY)
                         .build()
                         .buildException();
             }
-            if (metaProperties.getPhoenixTTL().longValue() != 
table.getPhoenixTTL()) {
-                
metaPropertiesEvaluated.setPhoenixTTL(metaProperties.getPhoenixTTL());
+            if (metaProperties.getTTL() != table.getTTL()) {
+                metaPropertiesEvaluated.setTTL(metaProperties.getTTL());
                 changingPhoenixTableProperty = true;
             }
         }
@@ -5548,7 +5590,7 @@ public class MetaDataClient {
         private ImmutableStorageScheme immutableStorageSchemeProp = null;
         private Boolean useStatsForParallelizationProp = null;
         private boolean nonTxToTx = false;
-        private Long phoenixTTL = null;
+        private Integer ttl = null;
         private Boolean isChangeDetectionEnabled = null;
         private String physicalTableName = null;
         private String schemaVersion = null;
@@ -5668,9 +5710,13 @@ public class MetaDataClient {
             this.nonTxToTx = nonTxToTx;
         }
 
-        public Long getPhoenixTTL() { return phoenixTTL; }
+        public Integer getTTL() {
+            return ttl;
+        }
 
-        public void setPhoenixTTL(Long phoenixTTL) { this.phoenixTTL = 
phoenixTTL; }
+        public void setTTL(Integer ttl) {
+            this.ttl = ttl;
+        }
 
         public Boolean isChangeDetectionEnabled() {
             return isChangeDetectionEnabled;
@@ -5716,7 +5762,7 @@ public class MetaDataClient {
         private Boolean useStatsForParallelization = null;
         private Boolean isTransactional = null;
         private TransactionFactory.Provider transactionProvider = null;
-        private Long phoenixTTL = null;
+        private Integer ttl = null;
         private Boolean isChangeDetectionEnabled = null;
         private String physicalTableName = null;
         private String schemaVersion = null;
@@ -5817,9 +5863,9 @@ public class MetaDataClient {
             this.transactionProvider = transactionProvider;
         }
 
-        public Long getPhoenixTTL() { return phoenixTTL; }
+        public Integer getTTL() { return ttl; }
 
-        public void setPhoenixTTL(Long phoenixTTL) { this.phoenixTTL = 
phoenixTTL; }
+        public void setTTL(Integer ttl) { this.ttl = ttl; }
 
         public Boolean isChangeDetectionEnabled() {
             return isChangeDetectionEnabled;
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
index 1798d444b6..b721238931 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
@@ -917,19 +917,27 @@ public interface PTable extends PMetaDataEntity {
     Map<String, String> getPropertyValues();
     Map<String, String> getDefaultPropertyValues();
 
+    /**
+     * @return The TTL duration associated with the entity when Phoenix level 
TTL is enabled.
+     */
+    int getTTL();
+
     /**
      * @return The PHOENIX_TTL duration associated with the entity.
      */
+    @Deprecated
     long getPhoenixTTL();
 
     /**
      * @return The PHOENIX_TTL high water mark timestamp associated with the 
entity.
      */
+    @Deprecated
     long getPhoenixTTLHighWaterMark();
 
     /**
      * @return If the view has overridden the TTL set at the parent entity 
level.
      */
+    @Deprecated
     boolean hasViewModifiedPhoenixTTL();
 
     /**
@@ -962,6 +970,12 @@ public interface PTable extends PMetaDataEntity {
      */
     String getStreamingTopicName();
 
+    /**
+     * @return Prefixed KeyRange generated by the expression representing the 
view statement. In
+     * other words this will be one-to-one mapping between view and PREFIXED 
KeyRange that'll exist.
+     */
+    byte[] getRowKeyPrefix();
+
     /**
      * Class to help track encoded column qualifier counters per column family.
      */
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
index c8e6001e2b..e4c733e780 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java
@@ -30,6 +30,7 @@ import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAM
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.IMMUTABLE_STORAGE_SCHEME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.INDEX_STATE;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MULTI_TENANT;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SALT_BUCKETS;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTIONAL;
@@ -48,7 +49,7 @@ import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_USE_STATS_FO
 import static org.apache.phoenix.schema.SaltingUtil.SALTING_COLUMN;
 import static org.apache.phoenix.schema.TableProperty.DEFAULT_COLUMN_FAMILY;
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED_DEPRECATED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.MIN_PHOENIX_TTL_HWM;
 
 import java.io.IOException;
@@ -68,6 +69,7 @@ import java.util.Map.Entry;
 
 import javax.annotation.Nonnull;
 
+import org.apache.phoenix.schema.types.PVarbinary;
 import 
org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
@@ -203,6 +205,7 @@ public class PTableImpl implements PTable {
     private final QualifierEncodingScheme qualifierEncodingScheme;
     private final EncodedCQCounter encodedCQCounter;
     private final Boolean useStatsForParallelization;
+    private final int ttl;
     private final long phoenixTTL;
     private final long phoenixTTLHighWaterMark;
     private final BitSet viewModifiedPropSet;
@@ -212,6 +215,7 @@ public class PTableImpl implements PTable {
     private String schemaVersion;
     private String externalSchemaId;
     private String streamingTopicName;
+    private byte[] rowKeyPrefix;
 
     public static class Builder {
         private PTableKey key;
@@ -277,6 +281,8 @@ public class PTableImpl implements PTable {
         private String schemaVersion;
         private String externalSchemaId;
         private String streamingTopicName;
+        private int ttl;
+        private byte[] rowKeyPrefix;
 
         // Used to denote which properties a view has explicitly modified
         private BitSet viewModifiedPropSet = new BitSet(3);
@@ -611,17 +617,25 @@ public class PTableImpl implements PTable {
             return this;
         }
 
+        @Deprecated
         public Builder setPhoenixTTL(long phoenixTTL) {
-            propertyValues.put(TTL, String.valueOf(phoenixTTL));
             this.phoenixTTL = phoenixTTL;
             return this;
         }
 
+        public Builder setTTL(int ttl) {
+            propertyValues.put(TTL, String.valueOf(ttl));
+            this.ttl = ttl;
+            return this;
+        }
+
+        @Deprecated
         public Builder setPhoenixTTLHighWaterMark(long 
phoenixTTLHighWaterMark) {
             this.phoenixTTLHighWaterMark = phoenixTTLHighWaterMark;
             return this;
         }
 
+        @Deprecated
         public Builder setViewModifiedPhoenixTTL(boolean modified) {
             this.viewModifiedPropSet.set(VIEW_MODIFIED_PHOENIX_TTL_BIT_SET_POS,
                     modified);
@@ -689,14 +703,21 @@ public class PTableImpl implements PTable {
                 this.externalSchemaId = externalSchemaId;
             }
             return this;
-         }
+        }
 
-         public Builder setStreamingTopicName(String streamingTopicName) {
+        public Builder setStreamingTopicName(String streamingTopicName) {
             if (streamingTopicName != null) {
                 this.streamingTopicName = streamingTopicName;
             }
             return this;
-         }
+        }
+
+        public Builder setRowKeyPrefix(byte[] rowKeyPrefix) {
+            if (rowKeyPrefix != null) {
+                this.rowKeyPrefix = rowKeyPrefix;
+            }
+            return this;
+        }
 
         /**
          * Populate derivable attributes of the PTable
@@ -979,6 +1000,7 @@ public class PTableImpl implements PTable {
         this.qualifierEncodingScheme = builder.qualifierEncodingScheme;
         this.encodedCQCounter = builder.encodedCQCounter;
         this.useStatsForParallelization = builder.useStatsForParallelization;
+        this.ttl = builder.ttl;
         this.phoenixTTL = builder.phoenixTTL;
         this.phoenixTTLHighWaterMark = builder.phoenixTTLHighWaterMark;
         this.viewModifiedPropSet = builder.viewModifiedPropSet;
@@ -988,6 +1010,7 @@ public class PTableImpl implements PTable {
         this.schemaVersion = builder.schemaVersion;
         this.externalSchemaId = builder.externalSchemaId;
         this.streamingTopicName = builder.streamingTopicName;
+        this.rowKeyPrefix = builder.rowKeyPrefix;
     }
 
     // When cloning table, ignore the salt column as it will be added back in 
the constructor
@@ -1067,7 +1090,9 @@ public class PTableImpl implements PTable {
                 .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
                 .setSchemaVersion(table.getSchemaVersion())
                 .setExternalSchemaId(table.getExternalSchemaId())
-                .setStreamingTopicName(table.getStreamingTopicName());
+                .setStreamingTopicName(table.getStreamingTopicName())
+                .setTTL(table.getTTL())
+                .setRowKeyPrefix(table.getRowKeyPrefix());
     }
 
     @Override
@@ -1957,7 +1982,7 @@ public class PTableImpl implements PTable {
         if (table.hasUseStatsForParallelization()) {
             useStatsForParallelization = table.getUseStatsForParallelization();
         }
-        long phoenixTTL = PHOENIX_TTL_NOT_DEFINED;
+        long phoenixTTL = PHOENIX_TTL_NOT_DEFINED_DEPRECATED;
         if (table.hasPhoenixTTL()) {
             phoenixTTL = table.getPhoenixTTL();
         }
@@ -2001,6 +2026,17 @@ public class PTableImpl implements PTable {
             streamingTopicName =
                 (String) 
PVarchar.INSTANCE.toObject(table.getStreamingTopicName().toByteArray());
         }
+
+        Integer ttl = TTL_NOT_DEFINED;
+        if (table.hasTtl()) {
+            ttl = table.getTtl();
+        }
+
+        byte[] rowKeyPrefix = null;
+        if (table.hasRowKeyPrefix()) {
+            rowKeyPrefix = 
PVarbinary.INSTANCE.toBytes(table.getRowKeyPrefix());
+        }
+
         try {
             return new PTableImpl.Builder()
                     .setType(tableType)
@@ -2058,6 +2094,8 @@ public class PTableImpl implements PTable {
                     .setSchemaVersion(schemaVersion)
                     .setExternalSchemaId(externalSchemaId)
                     .setStreamingTopicName(streamingTopicName)
+                    .setTTL(ttl)
+                    .setRowKeyPrefix(rowKeyPrefix)
                     .build();
         } catch (SQLException e) {
             throw new RuntimeException(e); // Impossible
@@ -2197,6 +2235,10 @@ public class PTableImpl implements PTable {
         if (table.getStreamingTopicName() != null) {
             
builder.setStreamingTopicName(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getStreamingTopicName())));
         }
+        builder.setTtl(table.getTTL());
+        if (table.getRowKeyPrefix() != null) {
+            
builder.setRowKeyPrefix(ByteStringer.wrap(table.getRowKeyPrefix()));
+        }
         return builder.build();
     }
 
@@ -2293,11 +2335,17 @@ public class PTableImpl implements PTable {
     }
 
     @Override
+    public int getTTL() {
+        return ttl;
+    }
+    @Override
+    @Deprecated
     public long getPhoenixTTL() {
         return phoenixTTL;
     }
 
     @Override
+    @Deprecated
     public long getPhoenixTTLHighWaterMark() {
         return phoenixTTLHighWaterMark;
     }
@@ -2310,6 +2358,7 @@ public class PTableImpl implements PTable {
         return 
viewModifiedPropSet.get(VIEW_MODIFIED_USE_STATS_FOR_PARALLELIZATION_BIT_SET_POS);
     }
 
+    @Deprecated
     @Override public boolean hasViewModifiedPhoenixTTL() {
         return viewModifiedPropSet.get(VIEW_MODIFIED_PHOENIX_TTL_BIT_SET_POS);
     }
@@ -2339,6 +2388,11 @@ public class PTableImpl implements PTable {
         return streamingTopicName;
     }
 
+    @Override
+    public byte[] getRowKeyPrefix() {
+        return rowKeyPrefix;
+    }
+
     private static final class KVColumnFamilyQualifier {
         @Nonnull
         private final String colFamilyName;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
index 34e5f0a8e3..910d6889d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java
@@ -24,7 +24,7 @@ import static 
org.apache.phoenix.exception.SQLExceptionCode.DEFAULT_COLUMN_FAMIL
 import static 
org.apache.phoenix.exception.SQLExceptionCode.SALT_ONLY_ON_CREATE_TABLE;
 import static 
org.apache.phoenix.exception.SQLExceptionCode.VIEW_WITH_PROPERTIES;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.DEFAULT_COLUMN_FAMILY_NAME;
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 
 import java.sql.SQLException;
 import java.util.Map;
@@ -262,21 +262,21 @@ public enum TableProperty {
             if (value instanceof String) {
                 String strValue = (String) value;
                 if ("FOREVER".equalsIgnoreCase(strValue)) {
-                    return HConstants.LATEST_TIMESTAMP;
+                    return HConstants.FOREVER;
                 } else if ("NONE".equalsIgnoreCase(strValue)) {
-                    return PHOENIX_TTL_NOT_DEFINED;
+                    return TTL_NOT_DEFINED;
                 }
             } else if (value != null) {
-                //Not converting to seconds for better understanding at 
compaction and masking
-                //stage.As HBase Descriptor level gives this value in seconds.
-                return ((Number) value).longValue();
+                //Not converting to milli-seconds for better understanding at 
compaction and masking
+                //stage. As HBase Descriptor level gives this value in seconds.
+                return ((Number) value).intValue();
             }
             return value;
         }
 
         @Override
         public Object getPTableValue(PTable table) {
-            return table.getPhoenixTTL();
+            return table.getTTL();
         }
     },
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
index f487d51a0b..bdbe03d4af 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/tool/SchemaExtractionProcessor.java
@@ -54,7 +54,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
-import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.PHOENIX_TTL_NOT_DEFINED;
+import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL_NOT_DEFINED;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TRANSACTION_PROVIDER;
 import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TTL;
 import static 
org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.UPDATE_CACHE_FREQUENCY;
@@ -404,7 +404,7 @@ public class SchemaExtractionProcessor implements 
SchemaProcessor {
                 if (!key.equalsIgnoreCase(TTL)) {
                     definedProps.put(key, value);
                 } else {
-                    if (isPhoenixTTLEnabled && Long.parseLong(value) != 
PHOENIX_TTL_NOT_DEFINED) {
+                    if (isPhoenixTTLEnabled && Integer.parseInt(value) != 
TTL_NOT_DEFINED) {
                         definedProps.put(key, value);
                     }
                 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
index 383f7d2275..8324bc35df 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java
@@ -202,6 +202,8 @@ public class Transform {
                 .setSchemaVersion(table.getSchemaVersion())
                 .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled())
                 .setStreamingTopicName(table.getStreamingTopicName())
+                .setTTL(table.getTTL())
+                .setRowKeyPrefix(table.getRowKeyPrefix())
                 // Transformables
                 .setImmutableStorageScheme(
                         (changedProps.getImmutableStorageSchemeProp() != null? 
changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme()))
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index 3e14479dc0..944f2ae630 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -995,26 +995,26 @@ public class ScanUtil {
                 ((isServerSideMaskingSet != null) && 
(Boolean.parseBoolean(isServerSideMaskingSet))));
     }
 
-    public static long getPhoenixTTL(Scan scan) {
-        byte[] phoenixTTL = 
scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL);
+    public static int getTTL(Scan scan) {
+        byte[] phoenixTTL = scan.getAttribute(BaseScannerRegionObserver.TTL);
         if (phoenixTTL == null) {
             return DEFAULT_PHOENIX_TTL;
         }
-        return Bytes.toLong(phoenixTTL);
+        return Bytes.toInt(phoenixTTL);
     }
 
     public static boolean isMaskTTLExpiredRows(Scan scan) {
         return 
scan.getAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED) != null &&
                 
(Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED),
                         PDataType.TRUE_BYTES) == 0)
-                && scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL) != 
null;
+                && scan.getAttribute(BaseScannerRegionObserver.TTL) != null;
     }
 
     public static boolean isDeleteTTLExpiredRows(Scan scan) {
         return 
scan.getAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED) != null 
&& (
                 
Bytes.compareTo(scan.getAttribute(BaseScannerRegionObserver.DELETE_PHOENIX_TTL_EXPIRED),
                         PDataType.TRUE_BYTES) == 0)
-                && scan.getAttribute(BaseScannerRegionObserver.PHOENIX_TTL) != 
null;
+                && scan.getAttribute(BaseScannerRegionObserver.TTL) != null;
     }
 
     public static boolean isEmptyColumn(Cell cell, byte[] emptyCF, byte[] 
emptyCQ) {
@@ -1038,7 +1038,7 @@ public class ScanUtil {
 
     public static boolean isTTLExpired(Cell cell, Scan scan, long nowTS) {
         long ts = cell.getTimestamp();
-        long ttl = ScanUtil.getPhoenixTTL(scan);
+        int ttl = ScanUtil.getTTL(scan);
         return ts + ttl < nowTS;
     }
 
@@ -1250,7 +1250,7 @@ public class ScanUtil {
                 return;
             }
         }
-        if (dataTable.getPhoenixTTL() != 0) {
+        if (dataTable.getTTL() != 0) {
             byte[] emptyColumnFamilyName = 
SchemaUtil.getEmptyColumnFamily(table);
             byte[] emptyColumnName =
                     table.getEncodingScheme() == 
PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS ?
@@ -1260,8 +1260,8 @@ public class ScanUtil {
                     Bytes.toBytes(tableName));
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_FAMILY_NAME, 
emptyColumnFamilyName);
             
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER_NAME, 
emptyColumnName);
-            scan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL,
-                    Bytes.toBytes(Long.valueOf(dataTable.getPhoenixTTL())));
+            scan.setAttribute(BaseScannerRegionObserver.TTL,
+                    Bytes.toBytes(Integer.valueOf(dataTable.getTTL())));
             if (!ScanUtil.isDeleteTTLExpiredRows(scan)) {
                 
scan.setAttribute(BaseScannerRegionObserver.MASK_PHOENIX_TTL_EXPIRED, 
PDataType.TRUE_BYTES);
             }
diff --git a/phoenix-core/src/main/protobuf/PTable.proto 
b/phoenix-core/src/main/protobuf/PTable.proto
index 6f6a663a99..99d62d0bc5 100644
--- a/phoenix-core/src/main/protobuf/PTable.proto
+++ b/phoenix-core/src/main/protobuf/PTable.proto
@@ -117,6 +117,8 @@ message PTable {
   optional bytes externalSchemaId=50;
   optional PTable transformingNewTable=51;
   optional bytes streamingTopicName=52;
+  optional int32 ttl = 53;
+  optional bytes rowKeyPrefix = 54;
 }
 
 message EncodedCQCounter {
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
index e6d9b9a515..342a75d35c 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/PhoenixMultiViewReaderTest.java
@@ -69,14 +69,14 @@ public class PhoenixMultiViewReaderTest {
         viewInfoWritable = 
(ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
         assertEquals(tenantId, viewInfoWritable.getTenantId());
         assertEquals(viewName, viewInfoWritable.getViewName());
-        assertEquals(ttl, viewInfoWritable.getPhoenixTtl());
+        assertEquals(ttl, viewInfoWritable.getTTL());
         assertEquals(false, viewInfoWritable.isIndexRelation());
 
         assertTrue(phoenixMultiViewReader.nextKeyValue());
         viewInfoWritable = 
(ViewInfoTracker)phoenixMultiViewReader.getCurrentValue();
         assertEquals(tenantId, viewInfoWritable.getTenantId());
         assertEquals(viewName, viewInfoWritable.getViewName());
-        assertEquals(ttl, viewInfoWritable.getPhoenixTtl());
+        assertEquals(ttl, viewInfoWritable.getTTL());
         assertEquals(true, viewInfoWritable.isIndexRelation());
 
         assertFalse(phoenixMultiViewReader.nextKeyValue());
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
index 06fa4eef93..6f7bc3240b 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ScanUtilTest.java
@@ -536,7 +536,7 @@ public class ScanUtilTest {
 
                 long timestamp44 = 44L;
                 Scan testScan = new Scan();
-                testScan.setAttribute(BaseScannerRegionObserver.PHOENIX_TTL, 
Bytes.toBytes(1L));
+                testScan.setAttribute(BaseScannerRegionObserver.TTL, 
Bytes.toBytes(1));
                 // Test isTTLExpired
                 Assert.assertTrue(ScanUtil.isTTLExpired(cell42, testScan, 
timestamp44));
                 Assert.assertFalse(ScanUtil.isTTLExpired(cell43, testScan, 
timestamp44));
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 690e86bcda..189c31bbc6 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -1338,7 +1338,7 @@ public class TestUtil {
         long tableTTL = -1;
         if (phoenixTTLEnabled) {
             tableTTL = conn.unwrap(PhoenixConnection.class).getTable(new 
PTableKey(null,
-                    tableName.getNameAsString())).getPhoenixTTL();
+                    tableName.getNameAsString())).getTTL();
         } else {
             tableTTL = getColumnDescriptor(conn, tableName).getTimeToLive();
         }

Reply via email to