This is an automated email from the ASF dual-hosted git repository.

shahrs87 pushed a commit to branch PHOENIX-6883-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this 
push:
     new 0e471bfea0 PHOENIX-7025 : Create a new RPC to validate last ddl 
timestamp for read requests. (#1666)
0e471bfea0 is described below

commit 0e471bfea0d51c6e027dd06d3154df4d67fe769d
Author: palash <palashc...@gmail.com>
AuthorDate: Mon Oct 16 13:03:45 2023 -0700

    PHOENIX-7025 : Create a new RPC to validate last ddl timestamp for read 
requests. (#1666)
---
 .../apache/phoenix/cache/ServerMetadataCache.java  |   5 +
 .../coprocessor/PhoenixRegionServerEndpoint.java   |   5 +
 .../org/apache/phoenix/jdbc/PhoenixStatement.java  | 246 ++++++++--
 .../phoenix/query/ConnectionQueryServices.java     |   3 +
 .../phoenix/query/ConnectionQueryServicesImpl.java |  29 ++
 .../query/ConnectionlessQueryServicesImpl.java     |  10 +
 .../query/DelegateConnectionQueryServices.java     |  11 +
 .../org/apache/phoenix/query/QueryServices.java    |   3 +
 .../apache/phoenix/query/QueryServicesOptions.java |   1 +
 .../org/apache/phoenix/schema/MetaDataClient.java  |  43 +-
 .../phoenix/cache/ServerMetadataCacheTest.java     | 543 ++++++++++++++++++---
 11 files changed, 775 insertions(+), 124 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
index 055ab1424c..15ce11e145 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java
@@ -179,4 +179,9 @@ public class ServerMetadataCache {
         LOGGER.info("Resetting ServerMetadataCache");
         INSTANCE = null;
     }
+
+    @VisibleForTesting
+    public static void setInstance(ServerMetadataCache cache) {
+        INSTANCE = cache;
+    }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
index e3448bc718..a114bea095 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java
@@ -73,6 +73,11 @@ public class PhoenixRegionServerEndpoint
                 LOGGER.error(errorMsg,  t);
                 IOException ioe = ServerUtil.createIOException(errorMsg, t);
                 ProtobufUtil.setControllerException(controller, ioe);
+                //If an index was dropped and a client tries to query it, we 
will validate table
+                //first and encounter stale metadata, if we don't break the 
coproc will run into
+                //table not found error since it will not be able to validate 
the dropped index.
+                //this should be fine for views too since we will update the 
entire hierarchy.
+                break;
             }
         }
     }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java 
b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index d73f1cbc15..711b2771fb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -64,15 +64,20 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
-
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Consistency;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.call.CallRunner;
 import org.apache.phoenix.compile.BaseMutationPlan;
@@ -102,8 +107,11 @@ import org.apache.phoenix.compile.StatementPlan;
 import org.apache.phoenix.compile.TraceQueryPlan;
 import org.apache.phoenix.compile.UpsertCompiler;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
+import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint;
+import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos;
 import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
+import org.apache.phoenix.exception.StaleMetadataCacheException;
 import org.apache.phoenix.exception.UpgradeRequiredException;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
@@ -193,12 +201,16 @@ import 
org.apache.phoenix.schema.MetaDataEntityNotFoundException;
 import org.apache.phoenix.schema.PColumnImpl;
 import org.apache.phoenix.schema.PDatum;
 import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PName;
 import org.apache.phoenix.schema.PNameFactory;
+import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.PTableKey;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeyValueAccessor;
 import org.apache.phoenix.schema.Sequence;
 import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
 import org.apache.phoenix.schema.tuple.MultiKeyValueTuple;
@@ -287,10 +299,12 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
     private int queryTimeoutMillis;
     // Caching per Statement
     protected final Calendar localCalendar = Calendar.getInstance();
+    private boolean validateLastDdlTimestamp;
 
     public PhoenixStatement(PhoenixConnection connection) {
         this.connection = connection;
         this.queryTimeoutMillis = getDefaultQueryTimeoutMillis();
+        this.validateLastDdlTimestamp = getValidateLastDdlTimestampEnabled();
     }
 
     /**
@@ -302,6 +316,12 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
             QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS);
     }
 
+    private boolean getValidateLastDdlTimestampEnabled() {
+        return connection.getQueryServices().getProps()
+                
.getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
+                        
QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
+    }
+
     protected List<PhoenixResultSet> getResultSets() {
         return resultSets;
     }
@@ -317,16 +337,123 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
     
     protected PhoenixResultSet executeQuery(final CompilableStatement stmt, 
final QueryLogger queryLogger)
             throws SQLException {
-        return executeQuery(stmt, true, queryLogger, false);
+        return executeQuery(stmt, true, queryLogger, false, 
this.validateLastDdlTimestamp);
     }
 
     protected PhoenixResultSet executeQuery(final CompilableStatement stmt, 
final QueryLogger queryLogger, boolean noCommit)
             throws SQLException {
-        return executeQuery(stmt, true, queryLogger, noCommit);
+        return executeQuery(stmt, true, queryLogger, noCommit, 
this.validateLastDdlTimestamp);
+    }
+
+    private String getInfoString(TableRef tableRef) {
+        return String.format("Tenant: %s, Schema: %s, Table: %s",
+                this.connection.getTenantId(),
+                tableRef.getTable().getSchemaName(),
+                tableRef.getTable().getTableName());
+    }
+
+    private void setLastDDLTimestampRequestParameters(
+            RegionServerEndpointProtos.LastDDLTimestampRequest.Builder 
builder, PTable pTable) {
+        byte[] tenantIDBytes = this.connection.getTenantId() == null
+                ? HConstants.EMPTY_BYTE_ARRAY
+                : this.connection.getTenantId().getBytes();
+        byte[] schemaBytes = pTable.getSchemaName() == null
+                                ?   HConstants.EMPTY_BYTE_ARRAY
+                                : pTable.getSchemaName().getBytes();
+        builder.setTenantId(ByteStringer.wrap(tenantIDBytes));
+        builder.setSchemaName(ByteStringer.wrap(schemaBytes));
+        
builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes()));
+        builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp());
+    }
+    /**
+     * Build a request for the validateLastDDLTimestamp RPC.
+     * @param tableRef
+     * @return ValidateLastDDLTimestampRequest for the table in tableRef
+     */
+    private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest
+                getValidateDDLTimestampRequest(TableRef tableRef) throws 
TableNotFoundException {
+        RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder 
requestBuilder
+                = 
RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder();
+        RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder
+                = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+
+        //when querying an index, we need to validate its parent table in case 
the index was dropped
+        if (PTableType.INDEX.equals(tableRef.getTable().getType())) {
+            PTableKey key = new PTableKey(this.connection.getTenantId(),
+                                                
tableRef.getTable().getParentName().getString());
+            PTable parentTable = this.connection.getTable(key);
+            setLastDDLTimestampRequestParameters(innerBuilder, parentTable);
+            requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+        }
+
+        innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+        setLastDDLTimestampRequestParameters(innerBuilder, 
tableRef.getTable());
+        requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+
+        //when querying a view, we need to validate last ddl timestamps for 
all its ancestors
+        if (PTableType.VIEW.equals(tableRef.getTable().getType())) {
+            PTable pTable = tableRef.getTable();
+            while (pTable.getParentName() != null) {
+                PTableKey key = new PTableKey(this.connection.getTenantId(),
+                                                
pTable.getParentName().getString());
+                PTable parentTable = this.connection.getTable(key);
+                innerBuilder = 
RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder();
+                setLastDDLTimestampRequestParameters(innerBuilder, 
parentTable);
+                requestBuilder.addLastDDLTimestampRequests(innerBuilder);
+                pTable = parentTable;
+            }
+        }
+        return requestBuilder.build();
+    }
+
+    /**
+     * Verifies that table metadata in client cache is up-to-date with server.
+     * A random live region server is picked for invoking the RPC to validate 
LastDDLTimestamp.
+     * Retry once if there was an error performing the RPC, otherwise throw 
the Exception.
+     * @param tableRef
+     * @throws SQLException
+     */
+    private void validateLastDDLTimestamp(TableRef tableRef, boolean doRetry) 
throws SQLException {
+
+        String infoString = getInfoString(tableRef);
+        try (Admin admin = this.connection.getQueryServices().getAdmin()) {
+            // get all live region servers
+            List<ServerName> regionServers
+                    = 
this.connection.getQueryServices().getLiveRegionServers();
+            // pick one at random
+            ServerName regionServer
+                    = 
regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size()));
+
+            LOGGER.debug("Sending DDL timestamp validation request for {} to 
regionserver {}",
+                    infoString, regionServer);
+
+            // RPC
+            CoprocessorRpcChannel channel = 
admin.coprocessorService(regionServer);
+            PhoenixRegionServerEndpoint.BlockingInterface service
+                    = PhoenixRegionServerEndpoint.newBlockingStub(channel);
+            service.validateLastDDLTimestamp(null, 
getValidateDDLTimestampRequest(tableRef));
+        } catch (Exception e) {
+            SQLException parsedException = ServerUtil.parseServerException(e);
+            if (parsedException instanceof StaleMetadataCacheException) {
+                throw parsedException;
+            }
+            //retry once for any exceptions other than 
StaleMetadataCacheException
+            LOGGER.error("Error in validating DDL timestamp for {}", 
infoString, parsedException);
+            if (doRetry) {
+                // update the list of live region servers
+                this.connection.getQueryServices().refreshLiveRegionServers();
+                validateLastDDLTimestamp(tableRef, false);
+                return;
+            }
+            throw parsedException;
+        }
     }
 
     private PhoenixResultSet executeQuery(final CompilableStatement stmt,
-                                          final boolean 
doRetryOnMetaNotFoundError, final QueryLogger queryLogger, final boolean 
noCommit) throws SQLException {
+                                          final boolean 
doRetryOnMetaNotFoundError,
+                                          final QueryLogger queryLogger, final 
boolean noCommit,
+                                          boolean 
shouldValidateLastDdlTimestamp)
+            throws SQLException {
         GLOBAL_SELECT_SQL_COUNTER.increment();
 
         try {
@@ -335,6 +462,7 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                         @Override public PhoenixResultSet call() throws 
SQLException {
                             final long startTime = 
EnvironmentEdgeManager.currentTimeMillis();
                             boolean success = false;
+                            boolean updateMetrics = true;
                             boolean pointLookup = false;
                             String tableName = null;
                             PhoenixResultSet rs = null;
@@ -371,6 +499,14 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 plan =
                                         
connection.getQueryServices().getOptimizer()
                                                 
.optimize(PhoenixStatement.this, plan);
+                                setLastQueryPlan(plan);
+
+                                //verify metadata for the table/view/index in 
the query plan
+                                //plan.getTableRef can be null in some cases 
like EXPLAIN <query>
+                                if (shouldValidateLastDdlTimestamp && 
plan.getTableRef() != null) {
+                                    
validateLastDDLTimestamp(plan.getTableRef(), true);
+                                }
+
                                 // this will create its own trace internally, 
so we don't wrap this
                                 // whole thing in tracing
                                 ResultIterator resultIterator = 
plan.iterator();
@@ -394,7 +530,6 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                         newResultSet(resultIterator, 
plan.getProjector(),
                                                 plan.getContext());
                                 resultSets.add(rs);
-                                setLastQueryPlan(plan);
                                 setLastResultSet(rs);
                                 setLastUpdateCount(NO_UPDATE);
                                 setLastUpdateOperation(stmt.getOperation());
@@ -416,13 +551,38 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                             
.updateCache(connection.getTenantId(),
                                                     e.getSchemaName(), 
e.getTableName(), true)
                                             .wasUpdated()) {
+                                        updateMetrics = false;
                                         //TODO we can log retry count and 
error for debugging in LOG table
-                                        return executeQuery(stmt, false, 
queryLogger, noCommit);
+                                        return executeQuery(stmt, false, 
queryLogger, noCommit,
+                                                                
shouldValidateLastDdlTimestamp);
                                     }
                                 }
                                 throw e;
-                            } catch (RuntimeException e) {
-
+                            } catch (StaleMetadataCacheException e) {
+                                updateMetrics = false;
+                                PTable pTable = 
lastQueryPlan.getTableRef().getTable();
+                                LOGGER.debug("Force updating client metadata 
cache for {}",
+                                        
getInfoString(getLastQueryPlan().getTableRef()));
+                                String schemaN = 
pTable.getSchemaName().toString();
+                                String tableN = 
pTable.getTableName().toString();
+                                PName tenantId = connection.getTenantId();
+
+                                // if the index metadata was stale, we will 
update the client cache
+                                // for the parent table, which will also add 
the new index metadata
+                                PTableType tableType =pTable.getType();
+                                if (tableType == PTableType.INDEX) {
+                                    schemaN = 
pTable.getParentSchemaName().toString();
+                                    tableN = 
pTable.getParentTableName().toString();
+                                }
+                                // force update client metadata cache for the 
table/view
+                                // this also updates the cache for all 
ancestors in case of a view
+                                new MetaDataClient(connection)
+                                        .updateCache(tenantId, schemaN, 
tableN, true);
+                                // skip last ddl timestamp validation in the 
retry
+                                return executeQuery(stmt, 
doRetryOnMetaNotFoundError, queryLogger,
+                                                        noCommit, false);
+                            }
+                            catch (RuntimeException e) {
                                 // FIXME: Expression.evaluate does not throw 
SQLException
                                 // so this will unwrap throws from that.
                                 if (e.getCause() instanceof SQLException) {
@@ -430,41 +590,43 @@ public class PhoenixStatement implements 
PhoenixMonitoredStatement, SQLCloseable
                                 }
                                 throw e;
                             } finally {
-                                // Regardless of whether the query was 
successfully handled or not,
-                                // update the time spent so far. If needed, we 
can separate out the
-                                // success times and failure times.
-                                
GLOBAL_QUERY_TIME.update(EnvironmentEdgeManager.currentTimeMillis()
-                                        - startTime);
-                                long
-                                        executeQueryTimeSpent =
-                                        
EnvironmentEdgeManager.currentTimeMillis() - startTime;
-                                if (tableName != null) {
-
-                                    TableMetricsManager
-                                            .updateMetricsMethod(tableName, 
SELECT_SQL_COUNTER, 1);
-                                    TableMetricsManager
-                                            .updateMetricsMethod(tableName, 
SELECT_SQL_QUERY_TIME,
-                                                    executeQueryTimeSpent);
-                                    if (success) {
-                                        
TableMetricsManager.updateMetricsMethod(tableName,
-                                                SELECT_SUCCESS_SQL_COUNTER, 1);
-                                        
TableMetricsManager.updateMetricsMethod(tableName,
-                                                pointLookup ?
-                                                        
SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER :
-                                                        
SELECT_SCAN_SUCCESS_SQL_COUNTER, 1);
-                                    } else {
-                                        
TableMetricsManager.updateMetricsMethod(tableName,
-                                                SELECT_FAILED_SQL_COUNTER, 1);
-                                        
TableMetricsManager.updateMetricsMethod(tableName,
-                                                
SELECT_AGGREGATE_FAILURE_SQL_COUNTER, 1);
-                                        
TableMetricsManager.updateMetricsMethod(tableName,
-                                                pointLookup ?
-                                                        
SELECT_POINTLOOKUP_FAILED_SQL_COUNTER :
-                                                        
SELECT_SCAN_FAILED_SQL_COUNTER, 1);
+                                if (updateMetrics) {
+                                    // Regardless of whether the query was 
successfully handled or not,
+                                    // update the time spent so far. If 
needed, we can separate out the
+                                    // success times and failure times.
+                                    
GLOBAL_QUERY_TIME.update(EnvironmentEdgeManager.currentTimeMillis()
+                                            - startTime);
+                                    long
+                                            executeQueryTimeSpent =
+                                            
EnvironmentEdgeManager.currentTimeMillis() - startTime;
+                                    if (tableName != null) {
+
+                                        TableMetricsManager
+                                                
.updateMetricsMethod(tableName, SELECT_SQL_COUNTER, 1);
+                                        TableMetricsManager
+                                                
.updateMetricsMethod(tableName, SELECT_SQL_QUERY_TIME,
+                                                        executeQueryTimeSpent);
+                                        if (success) {
+                                            
TableMetricsManager.updateMetricsMethod(tableName,
+                                                    
SELECT_SUCCESS_SQL_COUNTER, 1);
+                                            
TableMetricsManager.updateMetricsMethod(tableName,
+                                                    pointLookup ?
+                                                            
SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER :
+                                                            
SELECT_SCAN_SUCCESS_SQL_COUNTER, 1);
+                                        } else {
+                                            
TableMetricsManager.updateMetricsMethod(tableName,
+                                                    SELECT_FAILED_SQL_COUNTER, 
1);
+                                            
TableMetricsManager.updateMetricsMethod(tableName,
+                                                    
SELECT_AGGREGATE_FAILURE_SQL_COUNTER, 1);
+                                            
TableMetricsManager.updateMetricsMethod(tableName,
+                                                    pointLookup ?
+                                                            
SELECT_POINTLOOKUP_FAILED_SQL_COUNTER :
+                                                            
SELECT_SCAN_FAILED_SQL_COUNTER, 1);
+                                        }
+                                    }
+                                    if (rs != null) {
+                                        rs.setQueryTime(executeQueryTimeSpent);
                                     }
-                                }
-                                if (rs != null) {
-                                    rs.setQueryTime(executeQueryTimeSpent);
                                 }
                             }
                             return rs;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
index dd62ed125e..942e81d3e8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -137,6 +138,8 @@ public interface ConnectionQueryServices extends 
QueryServices, MetaDataMutated
 
     public int getLowestClusterHBaseVersion();
     public Admin getAdmin() throws SQLException;
+    void refreshLiveRegionServers() throws SQLException;
+    List<ServerName> getLiveRegionServers();
 
     void clearTableRegionCache(TableName name) throws SQLException;
 
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
index 551be97e1c..b49432aed0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java
@@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.NamespaceNotFoundException;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -394,6 +395,10 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
     private ServerSideRPCControllerFactory serverSideRPCControllerFactory;
     private boolean localIndexUpgradeRequired;
 
+    // writes guarded by "liveRegionServersLock"
+    private volatile List<ServerName> liveRegionServers;
+    private final Object liveRegionServersLock = new Object();
+
     private static interface FeatureSupported {
         boolean isSupported(ConnectionQueryServices services);
     }
@@ -3473,6 +3478,13 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
                             LOGGER.info("An instance of 
ConnectionQueryServices was created.");
                             openConnection();
                             hConnectionEstablished = true;
+                            boolean lastDDLTimestampValidationEnabled
+                                = getProps().getBoolean(
+                                    
QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED,
+                                    
QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED);
+                            if (lastDDLTimestampValidationEnabled) {
+                                refreshLiveRegionServers();
+                            }
                             String skipSystemExistenceCheck =
                                 
props.getProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK);
                             if (skipSystemExistenceCheck != null &&
@@ -5183,6 +5195,23 @@ public class ConnectionQueryServicesImpl extends 
DelegateQueryServices implement
         }
     }
 
+    @Override
+    public void refreshLiveRegionServers() throws SQLException {
+        synchronized (liveRegionServersLock) {
+            try (Admin admin = getAdmin()) {
+                this.liveRegionServers = new 
ArrayList<>(admin.getRegionServers(true));
+            } catch (IOException e) {
+                throw ServerUtil.parseServerException(e);
+            }
+        }
+        LOGGER.info("Refreshed list of live region servers.");
+    }
+
+    @Override
+    public List<ServerName> getLiveRegionServers() {
+        return this.liveRegionServers;
+    }
+
     @Override
     public Admin getAdmin() throws SQLException {
         try {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
index bd66df3dac..32defae1e0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java
@@ -471,6 +471,16 @@ public class ConnectionlessQueryServicesImpl extends 
DelegateQueryServices imple
         return Integer.MAX_VALUE; // Allow everything for connectionless
     }
 
+    @Override
+    public void refreshLiveRegionServers() throws SQLException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<ServerName> getLiveRegionServers() {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public Admin getAdmin() throws SQLException {
         throw new UnsupportedOperationException();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
index 4f86efc8aa..0957fbf0d8 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java
@@ -25,6 +25,7 @@ import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Mutation;
@@ -179,6 +180,16 @@ public class DelegateConnectionQueryServices extends 
DelegateQueryServices imple
         return getDelegate().getLowestClusterHBaseVersion();
     }
 
+    @Override
+    public void refreshLiveRegionServers() throws SQLException {
+        getDelegate().refreshLiveRegionServers();
+    }
+
+    @Override
+    public List<ServerName> getLiveRegionServers() {
+        return getDelegate().getLiveRegionServers();
+    }
+
     @Override
     public Admin getAdmin() throws SQLException {
         return getDelegate().getAdmin();
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index f980fe041e..752c0057d0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -313,6 +313,9 @@ public interface QueryServices extends SQLCloseable {
     //Update Cache Frequency default config attribute
     public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB  = 
"phoenix.default.update.cache.frequency";
 
+    // whether to validate last ddl timestamps during client operations
+    public static final String LAST_DDL_TIMESTAMP_VALIDATION_ENABLED = 
"phoenix.ddl.timestamp.validation.enabled";
+
     // Whether to enable cost-based-decision in the query optimizer
     public static final String COST_BASED_OPTIMIZER_ENABLED = 
"phoenix.costbased.optimizer.enabled";
     public static final String SMALL_SCAN_THRESHOLD_ATTRIB = 
"phoenix.query.smallScanThreshold";
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index adef425d82..6582d8af22 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -352,6 +352,7 @@ public class QueryServicesOptions {
     //default update cache frequency
     public static final long DEFAULT_UPDATE_CACHE_FREQUENCY = 0;
     public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100;
+    public static final boolean DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED 
= false;
 
     // default system task handling interval in milliseconds
     public static final long DEFAULT_TASK_HANDLING_INTERVAL_MS = 60*1000; // 1 
min
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index 4fbee3cc24..16f391fbe8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -680,7 +680,7 @@ public class MetaDataClient {
                         result.setTable(table);
                     }
                     if (result.getTable()!=null) {
-                        addTableToCache(result);
+                        addTableToCache(result, alwaysHitServer);
                     }
                     return result;
                 }
@@ -694,7 +694,7 @@ public class MetaDataClient {
                     // Otherwise, a tenant would be required to create a VIEW 
first
                     // which is not really necessary unless you want to filter 
or add
                     // columns
-                    addTableToCache(result);
+                    addTableToCache(result, alwaysHitServer);
                     return result;
                 } else {
                     // if (result.getMutationCode() == 
MutationCode.NEWER_TABLE_FOUND) {
@@ -710,7 +710,8 @@ public class MetaDataClient {
                             // In this case, we update the parent table which 
may in turn pull
                             // in indexes to add to this table.
                             long resolvedTime = 
TransactionUtil.getResolvedTime(connection, result);
-                            if (addColumnsAndIndexesFromAncestors(result, 
resolvedTimestamp, true)) {
+                            if (addColumnsAndIndexesFromAncestors(result, 
resolvedTimestamp,
+                                    true, false)) {
                                 connection.addTable(result.getTable(), 
resolvedTime);
                             } else {
                                 // if we aren't adding the table, we still 
need to update the
@@ -886,11 +887,15 @@ public class MetaDataClient {
      * @param resolvedTimestamp timestamp at which child table was resolved
      * @param alwaysAddAncestorColumnsAndIndexes flag that determines whether 
we should recalculate
      *        all inherited columns and indexes that can be used in the view 
and
+     * @param alwaysHitServerForAncestors flag that determines whether we 
should fetch latest
+     *        metadata for ancestors from the server
      * @return true if the PTable contained by result was modified and false 
otherwise
      * @throws SQLException if the physical table cannot be found
      */
     private boolean addColumnsAndIndexesFromAncestors(MetaDataMutationResult 
result, Long resolvedTimestamp,
-                                                      boolean 
alwaysAddAncestorColumnsAndIndexes) throws SQLException {
+                                                      boolean 
alwaysAddAncestorColumnsAndIndexes,
+                                                      boolean 
alwaysHitServerForAncestors)
+            throws SQLException {
         PTable table = result.getTable();
         boolean hasIndexId = table.getViewIndexId() != null;
         // only need to inherit columns and indexes for view indexes and views
@@ -902,7 +907,7 @@ public class MetaDataClient {
                 String parentSchemaName = 
SchemaUtil.getSchemaNameFromFullName(parentName);
                 tableName = SchemaUtil.getTableNameFromFullName(parentName);
                 MetaDataMutationResult parentResult = 
updateCache(connection.getTenantId(), parentSchemaName, tableName,
-                        false, resolvedTimestamp);
+                        alwaysHitServerForAncestors, resolvedTimestamp);
                 PTable parentTable = parentResult.getTable();
                 if (parentResult.getMutationCode() == 
MutationCode.TABLE_NOT_FOUND || parentTable == null) {
                     // Try once more with different tenant id (connection can 
be global but view could be tenant
@@ -2818,6 +2823,7 @@ public class MetaDataClient {
                         .setColumns(columns.values())
                         .setPhoenixTTL(PHOENIX_TTL_NOT_DEFINED)
                         .setPhoenixTTLHighWaterMark(MIN_PHOENIX_TTL_HWM)
+                        .setLastDDLTimestamp(0L)
                         .build();
                 connection.addTable(table, 
MetaDataProtocol.MIN_TABLE_TIMESTAMP);
             }
@@ -3221,7 +3227,7 @@ public class MetaDataClient {
                         .setStreamingTopicName(streamingTopicName)
                         .build();
                 result = new MetaDataMutationResult(code, 
result.getMutationTime(), table, true);
-                addTableToCache(result);
+                addTableToCache(result, false);
                 return table;
             } catch (Throwable e) {
                 
TableMetricsManager.updateMetricsForSystemCatalogTableMethod(tableNameNode.toString(),
@@ -3311,7 +3317,7 @@ public class MetaDataClient {
         switch(code) {
             case TABLE_ALREADY_EXISTS:
                 if (result.getTable() != null) {
-                    addTableToCache(result);
+                    addTableToCache(result, false);
                 }
                 if (!statement.ifNotExists()) {
                     throw new TableAlreadyExistsException(schemaName, 
tableName, result.getTable());
@@ -3328,7 +3334,7 @@ public class MetaDataClient {
             case UNALLOWED_TABLE_MUTATION:
                 
throwsSQLExceptionUtil("CANNOT_MUTATE_TABLE",schemaName,tableName);
             case CONCURRENT_TABLE_MUTATION:
-                addTableToCache(result);
+                addTableToCache(result, false);
                 throw new ConcurrentTableMutationException(schemaName, 
tableName);
             case AUTO_PARTITION_SEQUENCE_NOT_FOUND:
                 throw new 
SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_PARTITION_SEQUENCE_UNDEFINED)
@@ -3630,7 +3636,7 @@ public class MetaDataClient {
         case COLUMN_NOT_FOUND:
             break;
         case CONCURRENT_TABLE_MUTATION:
-            addTableToCache(result);
+            addTableToCache(result, false);
             if (LOGGER.isDebugEnabled()) {
                 
LOGGER.debug(LogUtil.addCustomAnnotations("CONCURRENT_TABLE_MUTATION for table 
" + SchemaUtil.getTableName(schemaName, tableName), connection));
             }
@@ -4266,7 +4272,7 @@ public class MetaDataClient {
                 try {
                     MutationCode code = processMutationResult(schemaName, 
tableName, result);
                     if (code == MutationCode.COLUMN_ALREADY_EXISTS) {
-                        addTableToCache(result);
+                        addTableToCache(result, false);
                         if (!ifNotExists) {
                             throw new ColumnAlreadyExistsException(schemaName, 
tableName, SchemaUtil.findExistingColumn(result.getTable(), columns));
                         }
@@ -4278,7 +4284,7 @@ public class MetaDataClient {
                     String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
                     long resolvedTimeStamp = 
TransactionUtil.getResolvedTime(connection, result);
                     if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 
&& ! metaProperties.getNonTxToTx())) {
-                        addTableToCache(result, resolvedTimeStamp);
+                        addTableToCache(result, false, resolvedTimeStamp);
                         table = result.getTable();
                     } else  {
                         // remove the table from the cache, it will be fetched 
from the server the
@@ -4700,7 +4706,7 @@ public class MetaDataClient {
                 try {
                     MutationCode code = processMutationResult(schemaName, 
tableName, result);
                     if (code == MutationCode.COLUMN_NOT_FOUND) {
-                        addTableToCache(result);
+                        addTableToCache(result, false);
                         if (!statement.ifExists()) {
                             throw new ColumnNotFoundException(schemaName, 
tableName, Bytes.toString(result.getFamilyName()), 
Bytes.toString(result.getColumnName()));
                         }
@@ -4946,7 +4952,7 @@ public class MetaDataClient {
 
                 if (code == MutationCode.TABLE_ALREADY_EXISTS) {
                     if (result.getTable() != null) { // To accommodate 
connection-less update of index state
-                        addTableToCache(result);
+                        addTableToCache(result, false);
                         // Set so that we get the table below with the 
potentially modified rowKeyOrderOptimizable flag set
                         indexRef.setTable(result.getTable());
                         if (newIndexState == PIndexState.BUILDING && isAsync) {
@@ -5066,12 +5072,15 @@ public class MetaDataClient {
         }
     }
 
-    private void addTableToCache(MetaDataMutationResult result) throws 
SQLException {
-        addTableToCache(result, TransactionUtil.getResolvedTime(connection, 
result));
+    private void addTableToCache(MetaDataMutationResult result, boolean 
alwaysHitServerForAncestors)
+            throws SQLException {
+        addTableToCache(result, alwaysHitServerForAncestors,
+                TransactionUtil.getResolvedTime(connection, result));
     }
 
-    private void addTableToCache(MetaDataMutationResult result, long 
timestamp) throws SQLException {
-        addColumnsAndIndexesFromAncestors(result, null, false);
+    private void addTableToCache(MetaDataMutationResult result, boolean 
alwaysHitServerForAncestors,
+                                 long timestamp) throws SQLException {
+        addColumnsAndIndexesFromAncestors(result, null, false, 
alwaysHitServerForAncestors);
         PTable table = result.getTable();
         connection.addTable(table, timestamp);
     }
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
index 2d6880d97d..2a618cdcce 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java
@@ -19,9 +19,14 @@ package org.apache.phoenix.cache;
 
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.ConnectionProperty;
 import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.types.PVarchar;
 import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
@@ -30,6 +35,7 @@ import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -39,10 +45,12 @@ import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
@@ -57,11 +65,15 @@ import static org.mockito.Mockito.verify;
 
 @Category(ParallelStatsDisabledIT.class)
 public class ServerMetadataCacheTest extends ParallelStatsDisabledIT {
+
+    private final Random RANDOM = new Random(42);
+    private final long NEVER = (long) 
ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER");
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerMetadataCacheTest.class);
 
     @BeforeClass
     public static synchronized void doSetup() throws Exception {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
+        props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, 
Boolean.toString(true));
         setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
     }
 
@@ -84,9 +96,8 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
                 PropertiesUtil.deepCopy(TEST_PROPERTIES)));
         try(Connection conn = spyCQS.connect(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn,
                     tableNameStr);// --> First call to CQSI#getTable
             ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
@@ -123,13 +134,12 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
                 PropertiesUtil.deepCopy(TEST_PROPERTIES)));
         try (Connection conn = spyCQS.connect(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
             // Create view on table.
-            String whereClause = " WHERE COL1 = 1000";
+            String whereClause = " WHERE v1 = 1000";
             String viewNameStr = generateUniqueName();
-            conn.createStatement().execute(getCreateViewStmt(viewNameStr, 
tableNameStr, whereClause));
+            createViewWhereClause(conn, tableNameStr, viewNameStr, 
whereClause);
             viewTable = PhoenixRuntime.getTableNoCache(conn, viewNameStr);  // 
--> First call to CQSI#getTable
             ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
             // Override the connection to use in ServerMetadataCache
@@ -162,22 +172,20 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         String tableNameStr = generateUniqueName();
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
         }
         String tenantId = "T_" + generateUniqueName();
         Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         PTable tenantViewTable;
         // Create view on table.
-        String whereClause = " WHERE COL1 = 1000";
+        String whereClause = " WHERE v1 = 1000";
         String tenantViewNameStr = generateUniqueName();
         ConnectionQueryServices spyCQS = 
Mockito.spy(driver.getConnectionQueryServices(getUrl(),
                 PropertiesUtil.deepCopy(TEST_PROPERTIES)));
         try (Connection conn = spyCQS.connect(getUrl(), tenantProps)) {
-            conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr,
-                    tableNameStr, whereClause));
+            createViewWhereClause(conn, tableNameStr, tenantViewNameStr, 
whereClause);
             tenantViewTable = PhoenixRuntime.getTableNoCache(conn,
                     tenantViewNameStr);  // --> First call to CQSI#getTable
             ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
@@ -214,9 +222,8 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         PTable pTable;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr);
             ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
             // Override the connection to use in ServerMetadataCache
@@ -245,9 +252,8 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         PTable pTable;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(fullTableName);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, fullTableName, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn, fullTableName);
             ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
             // Override the connection to use in ServerMetadataCache
@@ -263,7 +269,7 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         }
     }
 
-    /**
+  /**
      * Make sure we are invalidating the cache for view with tenant connection.
      * @throws Exception
      */
@@ -273,20 +279,18 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         String tableNameStr = generateUniqueName();
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
         }
         String tenantId = "T_" + generateUniqueName();
         Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId);
         PTable tenantViewTable;
         // Create view on table.
-        String whereClause = " WHERE COL1 = 1000";
+        String whereClause = " WHERE V1 = 1000";
         String tenantViewNameStr = generateUniqueName();
         try (Connection conn = DriverManager.getConnection(getUrl(), 
tenantProps)) {
-            conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr,
-                    tableNameStr, whereClause));
+            createViewWhereClause(conn, tableNameStr, tenantViewNameStr, 
whereClause);
             tenantViewTable = PhoenixRuntime.getTableNoCache(conn, 
tenantViewNameStr);
             ServerMetadataCache cache = 
ServerMetadataCache.getInstance(config);
             // Override the connection to use in ServerMetadataCache
@@ -304,7 +308,6 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         }
     }
 
-
     /**
      * Make sure we are invalidating the cache for table with no tenant 
connection, no schema name
      * and valid table name when we run alter statement.
@@ -319,9 +322,8 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr);
             long lastDDLTimestamp = pTable.getLastDDLTimestamp();
             assertEquals(lastDDLTimestamp,
@@ -343,7 +345,7 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         }
     }
 
-    /**
+  /**
      * Make sure we are invalidating the cache for table with no tenant 
connection, no schema name
      * and valid table name when we run drop table statement.
      * @throws Exception
@@ -357,9 +359,8 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
+            createTable(conn, tableNameStr, NEVER);
             pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr);
             long lastDDLTimestamp = pTable.getLastDDLTimestamp();
             assertEquals(lastDDLTimestamp,
@@ -384,17 +385,17 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
     @Test
     public void testInvalidateCacheForBaseTableWithUpdateIndexStatement() 
throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url = QueryUtil.getConnectionUrl(props, config, "client");
         String tableNameStr = "TBL_" + generateUniqueName();
         String indexNameStr = "IND_" + generateUniqueName();
         byte[] indexNameBytes = Bytes.toBytes(indexNameStr);
         PTable indexTable;
         ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
-        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+        try (Connection conn = DriverManager.getConnection(url, props)) {
             conn.setAutoCommit(false);
-            String ddl = getCreateTableStmt(tableNameStr);
             // Create a test table.
-            conn.createStatement().execute(ddl);
-            String indexDDLStmt = "CREATE INDEX " + indexNameStr + " ON " + 
tableNameStr + "(col1)";
+            createTable(conn, tableNameStr, NEVER);
+            String indexDDLStmt = "CREATE INDEX " + indexNameStr + " ON " + 
tableNameStr + "(v1)";
             conn.createStatement().execute(indexDDLStmt);
             TestUtil.waitForIndexState(conn, indexNameStr, PIndexState.ACTIVE);
             indexTable = PhoenixRuntime.getTableNoCache(conn, indexNameStr);
@@ -422,7 +423,6 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         }
     }
 
-
     /**
      *  Test that we invalidate the cache for parent table and update the last 
ddl timestamp
      *  of the parent table while we add an index.
@@ -437,20 +437,14 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         String indexName = generateUniqueName();
         byte[] indexNameBytes = Bytes.toBytes(indexName);
         ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
-        String ddl =
-                "create table  " + tableName + " ( k integer PRIMARY KEY," + " 
v1 integer,"
-                        + " v2 integer)";
-        String createIndexDDL = "create index  " + indexName + " on " + 
tableName + " (v1)";
-        String dropIndexDDL = "DROP INDEX " + indexName + " ON " + tableName;
-        try (Connection conn = DriverManager.getConnection(getUrl());
-             Statement stmt = conn.createStatement()) {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
             conn.setAutoCommit(true);
-            stmt.execute(ddl);
+            createTable(conn, tableName, NEVER);
             long tableLastDDLTimestampBeforeIndexCreation = 
getLastDDLTimestamp(tableName);
             // Populate the cache
             assertNotNull(cache.getLastDDLTimestampForTable(null, null, 
tableNameBytes));
             Thread.sleep(1);
-            stmt.execute(createIndexDDL);
+            createIndex(conn, tableName, indexName, "v1");
             // Make sure that we have invalidated the last ddl timestamp for 
parent table
             // on all regionservers after we create an index.
             assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, 
null, tableNameBytes));
@@ -463,7 +457,7 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
             assertNotNull(indexLastDDLTimestampAfterCreation);
             // Adding a sleep for 1 ms so that we get new last ddl timestamp.
             Thread.sleep(1);
-            stmt.execute(dropIndexDDL);
+            dropIndex(conn, tableName, indexName);
             // Make sure that we invalidate the cache on regionserver for base 
table and an index
             // after we dropped an index.
             assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, 
null, tableNameBytes));
@@ -495,18 +489,13 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         ServerMetadataCache cache = ServerMetadataCache.getInstance(config);
         try(Connection conn = DriverManager.getConnection(getUrl());
             Statement stmt = conn.createStatement()) {
-            String whereClause = " WHERE COL1 < 1000";
-            String tableDDLStmt = getCreateTableStmt(tableName);
-            String viewDDLStmt = getCreateViewStmt(globalViewName, tableName, 
whereClause);
-            String viewIdxDDLStmt = 
getCreateViewIndexStmt(globalViewIndexName, globalViewName,
-                    "COL1");
-            String dropIndexDDL = "DROP INDEX " + globalViewIndexName + " ON " 
+ globalViewName;
-            stmt.execute(tableDDLStmt);
-            stmt.execute(viewDDLStmt);
+            String whereClause = " WHERE v1 < 1000";
+            createTable(conn, tableName, NEVER);
+            createViewWhereClause(conn, tableName, globalViewName, 
whereClause);
             // Populate the cache
             assertNotNull(cache.getLastDDLTimestampForTable(null, null, 
globalViewNameBytes));
             long viewLastDDLTimestampBeforeIndexCreation = 
getLastDDLTimestamp(globalViewName);
-            stmt.execute(viewIdxDDLStmt);
+            createIndex(conn, globalViewName, globalViewIndexName, "v1");
 
             // Make sure that we have invalidated the last ddl timestamp for 
parent global view
             // on all regionserver after we create a view index.
@@ -520,7 +509,7 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
             assertNotNull(indexLastDDLTimestampAfterCreation);
             // Adding a sleep for 1 ms so that we get new last ddl timestamp.
             Thread.sleep(1);
-            stmt.execute(dropIndexDDL);
+            dropIndex(conn, globalViewName, globalViewIndexName);
             // Make sure that we invalidate the cache on regionservers for 
view and its index after
             // we drop a view index.
             assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, 
null,
@@ -536,7 +525,396 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         }
     }
 
-    public long getLastDDLTimestamp(String tableName) throws SQLException {
+    /**
+     * Client-1 creates a table, upserts data and alters the table.
+     * Client-2 queries the table before and after the alter.
+     * Check queries work successfully in both cases and verify number of 
addTable invocations.
+     */
+    @Test
+    public void testSelectQueryWithOldDDLTimestamp() throws SQLException {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumCacheUpdates;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table with UCF=never and upsert data using client-1
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName);
+
+            // select query from client-2 works to populate client side 
metadata cache
+            // there should be 1 update to the client cache
+            query(conn2, tableName);
+            expectedNumCacheUpdates = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+
+            // add column using client-1 to update last ddl timestamp
+            alterTableAddColumn(conn1, tableName, "newCol1");
+
+            // reset the spy CQSI object
+            Mockito.reset(spyCqs2);
+
+            // select query from client-2 with old ddl timestamp works
+            // there should be one update to the client cache
+            query(conn2, tableName);
+            expectedNumCacheUpdates = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+
+            // select query from client-2 with latest ddl timestamp works
+            // there should be no more updates to client cache
+            query(conn2, tableName);
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+        }
+    }
+
+    /**
+     * Test DDL timestamp validation retry logic in case of any exception
+     * from Server other than StaleMetadataCacheException.
+     */
+    @Test
+    public void testSelectQueryServerSideExceptionInValidation() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        ServerMetadataCache cache = null;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table and upsert using client-1
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName);
+
+            // Instrument ServerMetadataCache to throw a SQLException once
+            cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCache spyCache = Mockito.spy(cache);
+            Mockito.doThrow(new 
SQLException("FAIL")).doCallRealMethod().when(spyCache)
+                    .getLastDDLTimestampForTable(any(), any(), 
eq(Bytes.toBytes(tableName)));
+            ServerMetadataCache.setInstance(spyCache);
+
+            // query using client-2 should succeed
+            query(conn2, tableName);
+
+            // verify live region servers were refreshed
+            Mockito.verify(spyCqs2, 
Mockito.times(1)).refreshLiveRegionServers();
+        }
+    }
+
+    /**
+     * Test Select query works when ddl timestamp validation with old 
timestamp encounters an exception.
+     * Verify that the list of live region servers was refreshed when ddl 
timestamp validation is retried.
+     * Verify that the client cache was updated after encountering 
StaleMetadataCacheException.
+     */
+    @Test
+    public void testSelectQueryWithOldDDLTimestampWithExceptionRetry() throws 
Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumCacheUpdates;
+        ServerMetadataCache cache = null;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table and upsert using client-1
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName);
+
+            // query using client-2 to populate cache
+            query(conn2, tableName);
+            expectedNumCacheUpdates = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+
+            // add column using client-1 to update last ddl timestamp
+            alterTableAddColumn(conn1, tableName, "newCol1");
+
+            // reset the spy CQSI object
+            Mockito.reset(spyCqs2);
+
+            // Instrument ServerMetadataCache to throw a SQLException once
+            cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCache spyCache = Mockito.spy(cache);
+            Mockito.doThrow(new 
SQLException("FAIL")).doCallRealMethod().when(spyCache)
+                    .getLastDDLTimestampForTable(any(), any(), 
eq(Bytes.toBytes(tableName)));
+            ServerMetadataCache.setInstance(spyCache);
+
+            // query using client-2 should succeed, one cache update
+            query(conn2, tableName);
+            expectedNumCacheUpdates = 1;
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+
+            // verify live region servers were refreshed
+            Mockito.verify(spyCqs2, 
Mockito.times(1)).refreshLiveRegionServers();
+        }
+    }
+
+    /**
+     * Test Select Query fails in case DDL timestamp validation throws 
SQLException twice.
+     */
+    @Test
+    public void testSelectQueryFails() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        ServerMetadataCache cache = null;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table and upsert using client-1
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName);
+
+            // Instrument ServerMetadataCache to throw a SQLException twice
+            cache = ServerMetadataCache.getInstance(config);
+            ServerMetadataCache spyCache = Mockito.spy(cache);
+            SQLException e = new SQLException("FAIL");
+            Mockito.doThrow(e).when(spyCache)
+                    .getLastDDLTimestampForTable(any(), any(), 
eq(Bytes.toBytes(tableName)));
+            ServerMetadataCache.setInstance(spyCache);
+
+            // query using client-2 should fail
+            query(conn2, tableName);
+            Assert.fail("Query should have thrown Exception");
+        }
+        catch (Exception e) {
+            Assert.assertTrue("SQLException was not thrown when last ddl 
timestamp validation encountered errors twice.", e instanceof SQLException);
+        }
+    }
+
+
+    /**
+     * Client-1 creates a table, 2 level of views on it and alters the first 
level view.
+     * Client-2 queries the second level view, verify that there were 3 cache 
updates in client-2,
+     * one each for the two views and base table.
+     */
+    @Test
+    public void testSelectQueryOnView() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+        int expectedNumCacheUpdates;
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            // create table using client-1
+            createTable(conn1, tableName, NEVER);
+            upsert(conn1, tableName);
+
+            // create 2 level of views using client-1
+            String view1 = generateUniqueName();
+            String view2 = generateUniqueName();
+            createView(conn1, tableName, view1);
+            createView(conn1, view1, view2);
+
+            // query second level view using client-2
+            query(conn2, view2);
+            expectedNumCacheUpdates = 3; // table, view1, view2
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+
+            // alter first level view using client-1 to update its last ddl 
timestamp
+            alterViewAddColumn(conn1, view1, "foo");
+
+            // reset the spy CQSI object
+            Mockito.reset(spyCqs2);
+
+            // query second level view
+            query(conn2, view2);
+
+            // verify there was a getTable RPC for the view and all its 
ancestors
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(view1)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), eq(PVarchar.INSTANCE.toBytes(view2)),
+                    anyLong(), anyLong());
+
+            // verify that the view and all its ancestors were updated in the 
client cache
+            expectedNumCacheUpdates = 3; // table, view1, view2
+            Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates))
+                    .addTable(any(PTable.class), anyLong());
+        }
+    }
+
+    /**
+     * Verify queries on system tables work as we will validate last ddl 
timestamps for them also.
+     */
+    @Test
+    public void testSelectQueryOnSystemTables() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url = QueryUtil.getConnectionUrl(props, config, "client");
+        ConnectionQueryServices cqs = driver.getConnectionQueryServices(url, 
props);
+
+        try (Connection conn = cqs.connect(url, props)) {
+            query(conn, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME);
+            query(conn, PhoenixDatabaseMetaData.SYSTEM_TASK_NAME);
+            query(conn, PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME);
+            query(conn, PhoenixDatabaseMetaData.SYSTEM_LOG_NAME);
+        }
+    }
+
+    /**
+     * Test query on index with stale last ddl timestamp.
+     * Client-1 creates a table and an index on it. Client-2 queries table to 
populate its cache.
+     * Client-1 alters a property on the index. Client-2 queries the table 
again.
+     * Verify that the second query works and the index metadata was updated 
in the client cache.
+     */
+    @Test
+    public void testSelectQueryAfterAlterIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates a table and an index on it
+            createTable(conn1, tableName, NEVER);
+            createIndex(conn1, tableName, indexName, "v1");
+            TestUtil.waitForIndexState(conn1, indexName, PIndexState.ACTIVE);
+
+            //client-2 populates its cache, 1 getTable and 1 addTable call for 
the table
+            query(conn2, tableName);
+            Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(1))
+                    .addTable(any(PTable.class), anyLong());
+
+            //client-1 updates index property
+            alterIndexChangeStateToRebuild(conn1, tableName, indexName);
+
+            //client-2's query using the index should work
+            PhoenixStatement stmt = 
conn2.createStatement().unwrap(PhoenixStatement.class);
+            stmt.executeQuery("SELECT k FROM " + tableName + " WHERE v1=1");
+            Assert.assertEquals("Query on secondary key should have used 
index.", indexName, 
stmt.getQueryPlan().getTableRef().getTable().getTableName().toString());
+
+            //verify client-2 cache was updated with the index's base table 
metadata
+            //this would have also updated the index metadata in its cache
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2))
+                    .addTable(any(PTable.class), anyLong());
+
+            //client-2 queries again with latest metadata
+            //verify no more getTable/addTable calls
+            queryWithIndex(conn2, tableName);
+            Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null),
+                    any(byte[].class), 
eq(PVarchar.INSTANCE.toBytes(tableName)),
+                    anyLong(), anyLong());
+            Mockito.verify(spyCqs2, Mockito.times(2))
+                    .addTable(any(PTable.class), anyLong());
+        }
+    }
+
+    /**
+     * Test that a client can learn about a newly created index.
+     * Client-1 creates a table, client-2 queries the table to populate its 
cache.
+     * Client-1 creates an index on the table. Client-2 queries the table 
using the index.
+     * Verify that client-2 uses the index for the query.
+     */
+    @Test
+    public void testSelectQueryAddIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates table
+            createTable(conn1, tableName, NEVER);
+
+            //client-2 populates its cache
+            query(conn2, tableName);
+
+            //client-1 creates an index on the table
+            createIndex(conn1, tableName, indexName, "v1");
+            TestUtil.waitForIndexState(conn1, indexName, PIndexState.ACTIVE);
+
+            //client-2 query should be able to use this index
+            PhoenixStatement stmt = 
conn2.createStatement().unwrap(PhoenixStatement.class);
+            ResultSet rs = stmt.executeQuery("SELECT k FROM " + tableName + " 
WHERE v1=1");
+            Assert.assertEquals("Query on secondary key should have used 
index.", indexName, 
stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+        }
+    }
+
+    /**
+     * Test that a client can learn about a dropped index.
+     * Client-1 creates a table and an index, client-2 queries the table to 
populate its cache.
+     * Client-1 drops the index. Client-2 queries the table with index hint.
+     * Verify that client-2 uses the data table for the query.
+     */
+    @Test
+    public void testSelectQueryDropIndex() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        String url1 = QueryUtil.getConnectionUrl(props, config, "client1");
+        String url2 = QueryUtil.getConnectionUrl(props, config, "client2");
+        String tableName = generateUniqueName();
+        String indexName = generateUniqueName();
+        ConnectionQueryServices spyCqs1 = 
Mockito.spy(driver.getConnectionQueryServices(url1, props));
+        ConnectionQueryServices spyCqs2 = 
Mockito.spy(driver.getConnectionQueryServices(url2, props));
+
+        try (Connection conn1 = spyCqs1.connect(url1, props);
+             Connection conn2 = spyCqs2.connect(url2, props)) {
+
+            //client-1 creates table and index on it
+            createTable(conn1, tableName, NEVER);
+            createIndex(conn1, tableName, indexName, "v1");
+
+            //client-2 populates its cache
+            query(conn2, tableName);
+
+            //client-1 drops the index
+            dropIndex(conn1, tableName, indexName);
+
+            //client-2 queries should use data table and not run into table 
not found error even when index hint is given
+            PhoenixStatement stmt = 
conn2.createStatement().unwrap(PhoenixStatement.class);
+            ResultSet rs = stmt.executeQuery("SELECT /*+ INDEX(" + tableName + 
" " + indexName + ") */ * FROM " + tableName + " WHERE v1=1");
+            Assert.assertEquals("Query should have used data table since index 
was dropped", tableName, 
stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString());
+        }
+    }
+
+
+    //Helper methods
+
+    private long getLastDDLTimestamp(String tableName) throws SQLException {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         // Need to use different connection than what is used for creating 
table or indexes.
         String url = QueryUtil.getConnectionUrl(props, config, "client1");
@@ -546,22 +924,57 @@ public class ServerMetadataCacheTest extends 
ParallelStatsDisabledIT {
         }
     }
 
+    private void createTable(Connection conn, String tableName, long 
updateCacheFrequency) throws SQLException {
+        conn.createStatement().execute("CREATE TABLE " + tableName
+                + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER)"
+                + (updateCacheFrequency == 0 ? "" : 
"UPDATE_CACHE_FREQUENCY="+updateCacheFrequency));
+    }
+
+    private void createView(Connection conn, String parentName, String 
viewName) throws SQLException {
+        conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT 
* FROM " + parentName);
+    }
+
+    private void createViewWhereClause(Connection conn, String parentName, 
String viewName, String whereClause) throws SQLException {
+        conn.createStatement().execute("CREATE VIEW " + viewName +
+                " AS SELECT * FROM "+ parentName + whereClause);
+    }
+
+    private void createIndex(Connection conn, String tableName, String 
indexName, String col) throws SQLException {
+        conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + 
tableName + "(" + col + ")");
+    }
+
+    private void upsert(Connection conn, String tableName) throws SQLException 
{
+        conn.createStatement().execute("UPSERT INTO " + tableName +
+                " (k, v1, v2) VALUES ("+  RANDOM.nextInt() +", " + 
RANDOM.nextInt() + ", " + RANDOM.nextInt() +")");
+        conn.commit();
+    }
+
+    private void query(Connection conn, String tableName) throws SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) 
FROM " + tableName);
+        rs.next();
+    }
+
+    private void queryWithIndex(Connection conn, String tableName) throws 
SQLException {
+        ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM " + 
tableName + " WHERE v1=1");
+        rs.next();
+    }
+
+    private void alterTableAddColumn(Connection conn, String tableName, String 
columnName) throws SQLException {
+        conn.createStatement().execute("ALTER TABLE " + tableName + " ADD IF 
NOT EXISTS "
+                + columnName + " INTEGER");
+    }
 
-    private String getCreateTableStmt(String tableName) {
-        return   "CREATE TABLE " + tableName +
-                "  (a_string varchar not null, col1 integer" +
-                "  CONSTRAINT pk PRIMARY KEY (a_string)) ";
+    private void alterViewAddColumn(Connection conn, String viewName, String 
columnName) throws SQLException {
+        conn.createStatement().execute("ALTER VIEW " + viewName + " ADD IF NOT 
EXISTS "
+                + columnName + " INTEGER");
     }
 
-    private String getCreateViewStmt(String viewName, String fullTableName, 
String whereClause) {
-        String viewStmt =  "CREATE VIEW " + viewName +
-                " AS SELECT * FROM "+ fullTableName + whereClause;
-        return  viewStmt;
+    private void alterIndexChangeStateToRebuild(Connection conn, String 
tableName, String indexName) throws SQLException, InterruptedException {
+        conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + 
tableName + " REBUILD");
+        TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE);
     }
 
-    private String getCreateViewIndexStmt(String indexName, String viewName, 
String indexColumn) {
-        String viewIndexName =
-                "CREATE INDEX " + indexName + " ON " + viewName + "(" + 
indexColumn + ")";
-        return viewIndexName;
+    private void dropIndex(Connection conn, String tableName, String 
indexName) throws SQLException {
+        conn.createStatement().execute("DROP INDEX " + indexName + " ON " + 
tableName);
     }
 }

Reply via email to