This is an automated email from the ASF dual-hosted git repository. shahrs87 pushed a commit to branch PHOENIX-6883-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push: new 0e471bfea0 PHOENIX-7025 : Create a new RPC to validate last ddl timestamp for read requests. (#1666) 0e471bfea0 is described below commit 0e471bfea0d51c6e027dd06d3154df4d67fe769d Author: palash <palashc...@gmail.com> AuthorDate: Mon Oct 16 13:03:45 2023 -0700 PHOENIX-7025 : Create a new RPC to validate last ddl timestamp for read requests. (#1666) --- .../apache/phoenix/cache/ServerMetadataCache.java | 5 + .../coprocessor/PhoenixRegionServerEndpoint.java | 5 + .../org/apache/phoenix/jdbc/PhoenixStatement.java | 246 ++++++++-- .../phoenix/query/ConnectionQueryServices.java | 3 + .../phoenix/query/ConnectionQueryServicesImpl.java | 29 ++ .../query/ConnectionlessQueryServicesImpl.java | 10 + .../query/DelegateConnectionQueryServices.java | 11 + .../org/apache/phoenix/query/QueryServices.java | 3 + .../apache/phoenix/query/QueryServicesOptions.java | 1 + .../org/apache/phoenix/schema/MetaDataClient.java | 43 +- .../phoenix/cache/ServerMetadataCacheTest.java | 543 ++++++++++++++++++--- 11 files changed, 775 insertions(+), 124 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java index 055ab1424c..15ce11e145 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/ServerMetadataCache.java @@ -179,4 +179,9 @@ public class ServerMetadataCache { LOGGER.info("Resetting ServerMetadataCache"); INSTANCE = null; } + + @VisibleForTesting + public static void setInstance(ServerMetadataCache cache) { + INSTANCE = cache; + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java index e3448bc718..a114bea095 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/PhoenixRegionServerEndpoint.java @@ -73,6 +73,11 @@ public class PhoenixRegionServerEndpoint LOGGER.error(errorMsg, t); IOException ioe = ServerUtil.createIOException(errorMsg, t); ProtobufUtil.setControllerException(controller, ioe); + //If an index was dropped and a client tries to query it, we will validate table + //first and encounter stale metadata, if we don't break the coproc will run into + //table not found error since it will not be able to validate the dropped index. + //this should be fine for views too since we will update the entire hierarchy. + break; } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index d73f1cbc15..711b2771fb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -64,15 +64,20 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; - +import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.compile.BaseMutationPlan; @@ -102,8 +107,11 @@ import org.apache.phoenix.compile.StatementPlan; import org.apache.phoenix.compile.TraceQueryPlan; import org.apache.phoenix.compile.UpsertCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; +import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint; +import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.exception.StaleMetadataCacheException; import org.apache.phoenix.exception.UpgradeRequiredException; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.visitor.QueryPlanVisitor; @@ -193,12 +201,16 @@ import org.apache.phoenix.schema.MetaDataEntityNotFoundException; import org.apache.phoenix.schema.PColumnImpl; import org.apache.phoenix.schema.PDatum; import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; +import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.StatisticsCollectionScope; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; @@ -287,10 +299,12 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable private int queryTimeoutMillis; // Caching per Statement protected final Calendar localCalendar = Calendar.getInstance(); + private boolean validateLastDdlTimestamp; public PhoenixStatement(PhoenixConnection connection) { this.connection = connection; this.queryTimeoutMillis = getDefaultQueryTimeoutMillis(); + this.validateLastDdlTimestamp = getValidateLastDdlTimestampEnabled(); } /** @@ -302,6 +316,12 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); } + private boolean getValidateLastDdlTimestampEnabled() { + return connection.getQueryServices().getProps() + .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, + QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED); + } + protected List<PhoenixResultSet> getResultSets() { return resultSets; } @@ -317,16 +337,123 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger) throws SQLException { - return executeQuery(stmt, true, queryLogger, false); + return executeQuery(stmt, true, queryLogger, false, this.validateLastDdlTimestamp); } protected PhoenixResultSet executeQuery(final CompilableStatement stmt, final QueryLogger queryLogger, boolean noCommit) throws SQLException { - return executeQuery(stmt, true, queryLogger, noCommit); + return executeQuery(stmt, true, queryLogger, noCommit, this.validateLastDdlTimestamp); + } + + private String getInfoString(TableRef tableRef) { + return String.format("Tenant: %s, Schema: %s, Table: %s", + this.connection.getTenantId(), + tableRef.getTable().getSchemaName(), + tableRef.getTable().getTableName()); + } + + private void setLastDDLTimestampRequestParameters( + RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder, PTable pTable) { + byte[] tenantIDBytes = this.connection.getTenantId() == null + ? HConstants.EMPTY_BYTE_ARRAY + : this.connection.getTenantId().getBytes(); + byte[] schemaBytes = pTable.getSchemaName() == null + ? HConstants.EMPTY_BYTE_ARRAY + : pTable.getSchemaName().getBytes(); + builder.setTenantId(ByteStringer.wrap(tenantIDBytes)); + builder.setSchemaName(ByteStringer.wrap(schemaBytes)); + builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes())); + builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp()); + } + /** + * Build a request for the validateLastDDLTimestamp RPC. + * @param tableRef + * @return ValidateLastDDLTimestampRequest for the table in tableRef + */ + private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest + getValidateDDLTimestampRequest(TableRef tableRef) throws TableNotFoundException { + RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder + = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder(); + RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder + = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + + //when querying an index, we need to validate its parent table in case the index was dropped + if (PTableType.INDEX.equals(tableRef.getTable().getType())) { + PTableKey key = new PTableKey(this.connection.getTenantId(), + tableRef.getTable().getParentName().getString()); + PTable parentTable = this.connection.getTable(key); + setLastDDLTimestampRequestParameters(innerBuilder, parentTable); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + } + + innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + setLastDDLTimestampRequestParameters(innerBuilder, tableRef.getTable()); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + + //when querying a view, we need to validate last ddl timestamps for all its ancestors + if (PTableType.VIEW.equals(tableRef.getTable().getType())) { + PTable pTable = tableRef.getTable(); + while (pTable.getParentName() != null) { + PTableKey key = new PTableKey(this.connection.getTenantId(), + pTable.getParentName().getString()); + PTable parentTable = this.connection.getTable(key); + innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + setLastDDLTimestampRequestParameters(innerBuilder, parentTable); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + pTable = parentTable; + } + } + return requestBuilder.build(); + } + + /** + * Verifies that table metadata in client cache is up-to-date with server. + * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp. + * Retry once if there was an error performing the RPC, otherwise throw the Exception. + * @param tableRef + * @throws SQLException + */ + private void validateLastDDLTimestamp(TableRef tableRef, boolean doRetry) throws SQLException { + + String infoString = getInfoString(tableRef); + try (Admin admin = this.connection.getQueryServices().getAdmin()) { + // get all live region servers + List<ServerName> regionServers + = this.connection.getQueryServices().getLiveRegionServers(); + // pick one at random + ServerName regionServer + = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size())); + + LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}", + infoString, regionServer); + + // RPC + CoprocessorRpcChannel channel = admin.coprocessorService(regionServer); + PhoenixRegionServerEndpoint.BlockingInterface service + = PhoenixRegionServerEndpoint.newBlockingStub(channel); + service.validateLastDDLTimestamp(null, getValidateDDLTimestampRequest(tableRef)); + } catch (Exception e) { + SQLException parsedException = ServerUtil.parseServerException(e); + if (parsedException instanceof StaleMetadataCacheException) { + throw parsedException; + } + //retry once for any exceptions other than StaleMetadataCacheException + LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException); + if (doRetry) { + // update the list of live region servers + this.connection.getQueryServices().refreshLiveRegionServers(); + validateLastDDLTimestamp(tableRef, false); + return; + } + throw parsedException; + } } private PhoenixResultSet executeQuery(final CompilableStatement stmt, - final boolean doRetryOnMetaNotFoundError, final QueryLogger queryLogger, final boolean noCommit) throws SQLException { + final boolean doRetryOnMetaNotFoundError, + final QueryLogger queryLogger, final boolean noCommit, + boolean shouldValidateLastDdlTimestamp) + throws SQLException { GLOBAL_SELECT_SQL_COUNTER.increment(); try { @@ -335,6 +462,7 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable @Override public PhoenixResultSet call() throws SQLException { final long startTime = EnvironmentEdgeManager.currentTimeMillis(); boolean success = false; + boolean updateMetrics = true; boolean pointLookup = false; String tableName = null; PhoenixResultSet rs = null; @@ -371,6 +499,14 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable plan = connection.getQueryServices().getOptimizer() .optimize(PhoenixStatement.this, plan); + setLastQueryPlan(plan); + + //verify metadata for the table/view/index in the query plan + //plan.getTableRef can be null in some cases like EXPLAIN <query> + if (shouldValidateLastDdlTimestamp && plan.getTableRef() != null) { + validateLastDDLTimestamp(plan.getTableRef(), true); + } + // this will create its own trace internally, so we don't wrap this // whole thing in tracing ResultIterator resultIterator = plan.iterator(); @@ -394,7 +530,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable newResultSet(resultIterator, plan.getProjector(), plan.getContext()); resultSets.add(rs); - setLastQueryPlan(plan); setLastResultSet(rs); setLastUpdateCount(NO_UPDATE); setLastUpdateOperation(stmt.getOperation()); @@ -416,13 +551,38 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable .updateCache(connection.getTenantId(), e.getSchemaName(), e.getTableName(), true) .wasUpdated()) { + updateMetrics = false; //TODO we can log retry count and error for debugging in LOG table - return executeQuery(stmt, false, queryLogger, noCommit); + return executeQuery(stmt, false, queryLogger, noCommit, + shouldValidateLastDdlTimestamp); } } throw e; - } catch (RuntimeException e) { - + } catch (StaleMetadataCacheException e) { + updateMetrics = false; + PTable pTable = lastQueryPlan.getTableRef().getTable(); + LOGGER.debug("Force updating client metadata cache for {}", + getInfoString(getLastQueryPlan().getTableRef())); + String schemaN = pTable.getSchemaName().toString(); + String tableN = pTable.getTableName().toString(); + PName tenantId = connection.getTenantId(); + + // if the index metadata was stale, we will update the client cache + // for the parent table, which will also add the new index metadata + PTableType tableType =pTable.getType(); + if (tableType == PTableType.INDEX) { + schemaN = pTable.getParentSchemaName().toString(); + tableN = pTable.getParentTableName().toString(); + } + // force update client metadata cache for the table/view + // this also updates the cache for all ancestors in case of a view + new MetaDataClient(connection) + .updateCache(tenantId, schemaN, tableN, true); + // skip last ddl timestamp validation in the retry + return executeQuery(stmt, doRetryOnMetaNotFoundError, queryLogger, + noCommit, false); + } + catch (RuntimeException e) { // FIXME: Expression.evaluate does not throw SQLException // so this will unwrap throws from that. if (e.getCause() instanceof SQLException) { @@ -430,41 +590,43 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable } throw e; } finally { - // Regardless of whether the query was successfully handled or not, - // update the time spent so far. If needed, we can separate out the - // success times and failure times. - GLOBAL_QUERY_TIME.update(EnvironmentEdgeManager.currentTimeMillis() - - startTime); - long - executeQueryTimeSpent = - EnvironmentEdgeManager.currentTimeMillis() - startTime; - if (tableName != null) { - - TableMetricsManager - .updateMetricsMethod(tableName, SELECT_SQL_COUNTER, 1); - TableMetricsManager - .updateMetricsMethod(tableName, SELECT_SQL_QUERY_TIME, - executeQueryTimeSpent); - if (success) { - TableMetricsManager.updateMetricsMethod(tableName, - SELECT_SUCCESS_SQL_COUNTER, 1); - TableMetricsManager.updateMetricsMethod(tableName, - pointLookup ? - SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER : - SELECT_SCAN_SUCCESS_SQL_COUNTER, 1); - } else { - TableMetricsManager.updateMetricsMethod(tableName, - SELECT_FAILED_SQL_COUNTER, 1); - TableMetricsManager.updateMetricsMethod(tableName, - SELECT_AGGREGATE_FAILURE_SQL_COUNTER, 1); - TableMetricsManager.updateMetricsMethod(tableName, - pointLookup ? - SELECT_POINTLOOKUP_FAILED_SQL_COUNTER : - SELECT_SCAN_FAILED_SQL_COUNTER, 1); + if (updateMetrics) { + // Regardless of whether the query was successfully handled or not, + // update the time spent so far. If needed, we can separate out the + // success times and failure times. + GLOBAL_QUERY_TIME.update(EnvironmentEdgeManager.currentTimeMillis() + - startTime); + long + executeQueryTimeSpent = + EnvironmentEdgeManager.currentTimeMillis() - startTime; + if (tableName != null) { + + TableMetricsManager + .updateMetricsMethod(tableName, SELECT_SQL_COUNTER, 1); + TableMetricsManager + .updateMetricsMethod(tableName, SELECT_SQL_QUERY_TIME, + executeQueryTimeSpent); + if (success) { + TableMetricsManager.updateMetricsMethod(tableName, + SELECT_SUCCESS_SQL_COUNTER, 1); + TableMetricsManager.updateMetricsMethod(tableName, + pointLookup ? + SELECT_POINTLOOKUP_SUCCESS_SQL_COUNTER : + SELECT_SCAN_SUCCESS_SQL_COUNTER, 1); + } else { + TableMetricsManager.updateMetricsMethod(tableName, + SELECT_FAILED_SQL_COUNTER, 1); + TableMetricsManager.updateMetricsMethod(tableName, + SELECT_AGGREGATE_FAILURE_SQL_COUNTER, 1); + TableMetricsManager.updateMetricsMethod(tableName, + pointLookup ? + SELECT_POINTLOOKUP_FAILED_SQL_COUNTER : + SELECT_SCAN_FAILED_SQL_COUNTER, 1); + } + } + if (rs != null) { + rs.setQueryTime(executeQueryTimeSpent); } - } - if (rs != null) { - rs.setQueryTime(executeQueryTimeSpent); } } return rs; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index dd62ed125e..942e81d3e8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Mutation; @@ -137,6 +138,8 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated public int getLowestClusterHBaseVersion(); public Admin getAdmin() throws SQLException; + void refreshLiveRegionServers() throws SQLException; + List<ServerName> getLiveRegionServers(); void clearTableRegionCache(TableName name) throws SQLException; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 551be97e1c..b49432aed0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -135,6 +135,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceNotFoundException; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; @@ -394,6 +395,10 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement private ServerSideRPCControllerFactory serverSideRPCControllerFactory; private boolean localIndexUpgradeRequired; + // writes guarded by "liveRegionServersLock" + private volatile List<ServerName> liveRegionServers; + private final Object liveRegionServersLock = new Object(); + private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); } @@ -3473,6 +3478,13 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement LOGGER.info("An instance of ConnectionQueryServices was created."); openConnection(); hConnectionEstablished = true; + boolean lastDDLTimestampValidationEnabled + = getProps().getBoolean( + QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, + QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED); + if (lastDDLTimestampValidationEnabled) { + refreshLiveRegionServers(); + } String skipSystemExistenceCheck = props.getProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK); if (skipSystemExistenceCheck != null && @@ -5183,6 +5195,23 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } } + @Override + public void refreshLiveRegionServers() throws SQLException { + synchronized (liveRegionServersLock) { + try (Admin admin = getAdmin()) { + this.liveRegionServers = new ArrayList<>(admin.getRegionServers(true)); + } catch (IOException e) { + throw ServerUtil.parseServerException(e); + } + } + LOGGER.info("Refreshed list of live region servers."); + } + + @Override + public List<ServerName> getLiveRegionServers() { + return this.liveRegionServers; + } + @Override public Admin getAdmin() throws SQLException { try { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index bd66df3dac..32defae1e0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -471,6 +471,16 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple return Integer.MAX_VALUE; // Allow everything for connectionless } + @Override + public void refreshLiveRegionServers() throws SQLException { + throw new UnsupportedOperationException(); + } + + @Override + public List<ServerName> getLiveRegionServers() { + throw new UnsupportedOperationException(); + } + @Override public Admin getAdmin() throws SQLException { throw new UnsupportedOperationException(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 4f86efc8aa..0957fbf0d8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Mutation; @@ -179,6 +180,16 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple return getDelegate().getLowestClusterHBaseVersion(); } + @Override + public void refreshLiveRegionServers() throws SQLException { + getDelegate().refreshLiveRegionServers(); + } + + @Override + public List<ServerName> getLiveRegionServers() { + return getDelegate().getLiveRegionServers(); + } + @Override public Admin getAdmin() throws SQLException { return getDelegate().getAdmin(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index f980fe041e..752c0057d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -313,6 +313,9 @@ public interface QueryServices extends SQLCloseable { //Update Cache Frequency default config attribute public static final String DEFAULT_UPDATE_CACHE_FREQUENCY_ATRRIB = "phoenix.default.update.cache.frequency"; + // whether to validate last ddl timestamps during client operations + public static final String LAST_DDL_TIMESTAMP_VALIDATION_ENABLED = "phoenix.ddl.timestamp.validation.enabled"; + // Whether to enable cost-based-decision in the query optimizer public static final String COST_BASED_OPTIMIZER_ENABLED = "phoenix.costbased.optimizer.enabled"; public static final String SMALL_SCAN_THRESHOLD_ATTRIB = "phoenix.query.smallScanThreshold"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index adef425d82..6582d8af22 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -352,6 +352,7 @@ public class QueryServicesOptions { //default update cache frequency public static final long DEFAULT_UPDATE_CACHE_FREQUENCY = 0; public static final int DEFAULT_SMALL_SCAN_THRESHOLD = 100; + public static final boolean DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED = false; // default system task handling interval in milliseconds public static final long DEFAULT_TASK_HANDLING_INTERVAL_MS = 60*1000; // 1 min diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 4fbee3cc24..16f391fbe8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -680,7 +680,7 @@ public class MetaDataClient { result.setTable(table); } if (result.getTable()!=null) { - addTableToCache(result); + addTableToCache(result, alwaysHitServer); } return result; } @@ -694,7 +694,7 @@ public class MetaDataClient { // Otherwise, a tenant would be required to create a VIEW first // which is not really necessary unless you want to filter or add // columns - addTableToCache(result); + addTableToCache(result, alwaysHitServer); return result; } else { // if (result.getMutationCode() == MutationCode.NEWER_TABLE_FOUND) { @@ -710,7 +710,8 @@ public class MetaDataClient { // In this case, we update the parent table which may in turn pull // in indexes to add to this table. long resolvedTime = TransactionUtil.getResolvedTime(connection, result); - if (addColumnsAndIndexesFromAncestors(result, resolvedTimestamp, true)) { + if (addColumnsAndIndexesFromAncestors(result, resolvedTimestamp, + true, false)) { connection.addTable(result.getTable(), resolvedTime); } else { // if we aren't adding the table, we still need to update the @@ -886,11 +887,15 @@ public class MetaDataClient { * @param resolvedTimestamp timestamp at which child table was resolved * @param alwaysAddAncestorColumnsAndIndexes flag that determines whether we should recalculate * all inherited columns and indexes that can be used in the view and + * @param alwaysHitServerForAncestors flag that determines whether we should fetch latest + * metadata for ancestors from the server * @return true if the PTable contained by result was modified and false otherwise * @throws SQLException if the physical table cannot be found */ private boolean addColumnsAndIndexesFromAncestors(MetaDataMutationResult result, Long resolvedTimestamp, - boolean alwaysAddAncestorColumnsAndIndexes) throws SQLException { + boolean alwaysAddAncestorColumnsAndIndexes, + boolean alwaysHitServerForAncestors) + throws SQLException { PTable table = result.getTable(); boolean hasIndexId = table.getViewIndexId() != null; // only need to inherit columns and indexes for view indexes and views @@ -902,7 +907,7 @@ public class MetaDataClient { String parentSchemaName = SchemaUtil.getSchemaNameFromFullName(parentName); tableName = SchemaUtil.getTableNameFromFullName(parentName); MetaDataMutationResult parentResult = updateCache(connection.getTenantId(), parentSchemaName, tableName, - false, resolvedTimestamp); + alwaysHitServerForAncestors, resolvedTimestamp); PTable parentTable = parentResult.getTable(); if (parentResult.getMutationCode() == MutationCode.TABLE_NOT_FOUND || parentTable == null) { // Try once more with different tenant id (connection can be global but view could be tenant @@ -2818,6 +2823,7 @@ public class MetaDataClient { .setColumns(columns.values()) .setPhoenixTTL(PHOENIX_TTL_NOT_DEFINED) .setPhoenixTTLHighWaterMark(MIN_PHOENIX_TTL_HWM) + .setLastDDLTimestamp(0L) .build(); connection.addTable(table, MetaDataProtocol.MIN_TABLE_TIMESTAMP); } @@ -3221,7 +3227,7 @@ public class MetaDataClient { .setStreamingTopicName(streamingTopicName) .build(); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); - addTableToCache(result); + addTableToCache(result, false); return table; } catch (Throwable e) { TableMetricsManager.updateMetricsForSystemCatalogTableMethod(tableNameNode.toString(), @@ -3311,7 +3317,7 @@ public class MetaDataClient { switch(code) { case TABLE_ALREADY_EXISTS: if (result.getTable() != null) { - addTableToCache(result); + addTableToCache(result, false); } if (!statement.ifNotExists()) { throw new TableAlreadyExistsException(schemaName, tableName, result.getTable()); @@ -3328,7 +3334,7 @@ public class MetaDataClient { case UNALLOWED_TABLE_MUTATION: throwsSQLExceptionUtil("CANNOT_MUTATE_TABLE",schemaName,tableName); case CONCURRENT_TABLE_MUTATION: - addTableToCache(result); + addTableToCache(result, false); throw new ConcurrentTableMutationException(schemaName, tableName); case AUTO_PARTITION_SEQUENCE_NOT_FOUND: throw new SQLExceptionInfo.Builder(SQLExceptionCode.AUTO_PARTITION_SEQUENCE_UNDEFINED) @@ -3630,7 +3636,7 @@ public class MetaDataClient { case COLUMN_NOT_FOUND: break; case CONCURRENT_TABLE_MUTATION: - addTableToCache(result); + addTableToCache(result, false); if (LOGGER.isDebugEnabled()) { LOGGER.debug(LogUtil.addCustomAnnotations("CONCURRENT_TABLE_MUTATION for table " + SchemaUtil.getTableName(schemaName, tableName), connection)); } @@ -4266,7 +4272,7 @@ public class MetaDataClient { try { MutationCode code = processMutationResult(schemaName, tableName, result); if (code == MutationCode.COLUMN_ALREADY_EXISTS) { - addTableToCache(result); + addTableToCache(result, false); if (!ifNotExists) { throw new ColumnAlreadyExistsException(schemaName, tableName, SchemaUtil.findExistingColumn(result.getTable(), columns)); } @@ -4278,7 +4284,7 @@ public class MetaDataClient { String fullTableName = SchemaUtil.getTableName(schemaName, tableName); long resolvedTimeStamp = TransactionUtil.getResolvedTime(connection, result); if (table.getIndexes().isEmpty() || (numPkColumnsAdded==0 && ! metaProperties.getNonTxToTx())) { - addTableToCache(result, resolvedTimeStamp); + addTableToCache(result, false, resolvedTimeStamp); table = result.getTable(); } else { // remove the table from the cache, it will be fetched from the server the @@ -4700,7 +4706,7 @@ public class MetaDataClient { try { MutationCode code = processMutationResult(schemaName, tableName, result); if (code == MutationCode.COLUMN_NOT_FOUND) { - addTableToCache(result); + addTableToCache(result, false); if (!statement.ifExists()) { throw new ColumnNotFoundException(schemaName, tableName, Bytes.toString(result.getFamilyName()), Bytes.toString(result.getColumnName())); } @@ -4946,7 +4952,7 @@ public class MetaDataClient { if (code == MutationCode.TABLE_ALREADY_EXISTS) { if (result.getTable() != null) { // To accommodate connection-less update of index state - addTableToCache(result); + addTableToCache(result, false); // Set so that we get the table below with the potentially modified rowKeyOrderOptimizable flag set indexRef.setTable(result.getTable()); if (newIndexState == PIndexState.BUILDING && isAsync) { @@ -5066,12 +5072,15 @@ public class MetaDataClient { } } - private void addTableToCache(MetaDataMutationResult result) throws SQLException { - addTableToCache(result, TransactionUtil.getResolvedTime(connection, result)); + private void addTableToCache(MetaDataMutationResult result, boolean alwaysHitServerForAncestors) + throws SQLException { + addTableToCache(result, alwaysHitServerForAncestors, + TransactionUtil.getResolvedTime(connection, result)); } - private void addTableToCache(MetaDataMutationResult result, long timestamp) throws SQLException { - addColumnsAndIndexesFromAncestors(result, null, false); + private void addTableToCache(MetaDataMutationResult result, boolean alwaysHitServerForAncestors, + long timestamp) throws SQLException { + addColumnsAndIndexesFromAncestors(result, null, false, alwaysHitServerForAncestors); PTable table = result.getTable(); connection.addTable(table, timestamp); } diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java index 2d6880d97d..2a618cdcce 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java @@ -19,9 +19,14 @@ package org.apache.phoenix.cache; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; +import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.query.ConnectionQueryServices; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.ConnectionProperty; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; @@ -30,6 +35,7 @@ import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.After; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -39,10 +45,12 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.util.Map; import java.util.Properties; +import java.util.Random; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; @@ -57,11 +65,15 @@ import static org.mockito.Mockito.verify; @Category(ParallelStatsDisabledIT.class) public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { + + private final Random RANDOM = new Random(42); + private final long NEVER = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER"); private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheTest.class); @BeforeClass public static synchronized void doSetup() throws Exception { Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + props.put(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, Boolean.toString(true)); setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } @@ -84,9 +96,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { PropertiesUtil.deepCopy(TEST_PROPERTIES))); try(Connection conn = spyCQS.connect(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr);// --> First call to CQSI#getTable ServerMetadataCache cache = ServerMetadataCache.getInstance(config); @@ -123,13 +134,12 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { PropertiesUtil.deepCopy(TEST_PROPERTIES))); try (Connection conn = spyCQS.connect(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); // Create view on table. - String whereClause = " WHERE COL1 = 1000"; + String whereClause = " WHERE v1 = 1000"; String viewNameStr = generateUniqueName(); - conn.createStatement().execute(getCreateViewStmt(viewNameStr, tableNameStr, whereClause)); + createViewWhereClause(conn, tableNameStr, viewNameStr, whereClause); viewTable = PhoenixRuntime.getTableNoCache(conn, viewNameStr); // --> First call to CQSI#getTable ServerMetadataCache cache = ServerMetadataCache.getInstance(config); // Override the connection to use in ServerMetadataCache @@ -162,22 +172,20 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { String tableNameStr = generateUniqueName(); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); } String tenantId = "T_" + generateUniqueName(); Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES); tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); PTable tenantViewTable; // Create view on table. - String whereClause = " WHERE COL1 = 1000"; + String whereClause = " WHERE v1 = 1000"; String tenantViewNameStr = generateUniqueName(); ConnectionQueryServices spyCQS = Mockito.spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))); try (Connection conn = spyCQS.connect(getUrl(), tenantProps)) { - conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr, - tableNameStr, whereClause)); + createViewWhereClause(conn, tableNameStr, tenantViewNameStr, whereClause); tenantViewTable = PhoenixRuntime.getTableNoCache(conn, tenantViewNameStr); // --> First call to CQSI#getTable ServerMetadataCache cache = ServerMetadataCache.getInstance(config); @@ -214,9 +222,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { PTable pTable; try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr); ServerMetadataCache cache = ServerMetadataCache.getInstance(config); // Override the connection to use in ServerMetadataCache @@ -245,9 +252,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { PTable pTable; try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(fullTableName); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, fullTableName, NEVER); pTable = PhoenixRuntime.getTableNoCache(conn, fullTableName); ServerMetadataCache cache = ServerMetadataCache.getInstance(config); // Override the connection to use in ServerMetadataCache @@ -263,7 +269,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } - /** + /** * Make sure we are invalidating the cache for view with tenant connection. * @throws Exception */ @@ -273,20 +279,18 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { String tableNameStr = generateUniqueName(); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); } String tenantId = "T_" + generateUniqueName(); Properties tenantProps = PropertiesUtil.deepCopy(TEST_PROPERTIES); tenantProps.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, tenantId); PTable tenantViewTable; // Create view on table. - String whereClause = " WHERE COL1 = 1000"; + String whereClause = " WHERE V1 = 1000"; String tenantViewNameStr = generateUniqueName(); try (Connection conn = DriverManager.getConnection(getUrl(), tenantProps)) { - conn.createStatement().execute(getCreateViewStmt(tenantViewNameStr, - tableNameStr, whereClause)); + createViewWhereClause(conn, tableNameStr, tenantViewNameStr, whereClause); tenantViewTable = PhoenixRuntime.getTableNoCache(conn, tenantViewNameStr); ServerMetadataCache cache = ServerMetadataCache.getInstance(config); // Override the connection to use in ServerMetadataCache @@ -304,7 +308,6 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } - /** * Make sure we are invalidating the cache for table with no tenant connection, no schema name * and valid table name when we run alter statement. @@ -319,9 +322,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { ServerMetadataCache cache = ServerMetadataCache.getInstance(config); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr); long lastDDLTimestamp = pTable.getLastDDLTimestamp(); assertEquals(lastDDLTimestamp, @@ -343,7 +345,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } - /** + /** * Make sure we are invalidating the cache for table with no tenant connection, no schema name * and valid table name when we run drop table statement. * @throws Exception @@ -357,9 +359,8 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { ServerMetadataCache cache = ServerMetadataCache.getInstance(config); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); + createTable(conn, tableNameStr, NEVER); pTable = PhoenixRuntime.getTableNoCache(conn, tableNameStr); long lastDDLTimestamp = pTable.getLastDDLTimestamp(); assertEquals(lastDDLTimestamp, @@ -384,17 +385,17 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { @Test public void testInvalidateCacheForBaseTableWithUpdateIndexStatement() throws Exception { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url = QueryUtil.getConnectionUrl(props, config, "client"); String tableNameStr = "TBL_" + generateUniqueName(); String indexNameStr = "IND_" + generateUniqueName(); byte[] indexNameBytes = Bytes.toBytes(indexNameStr); PTable indexTable; ServerMetadataCache cache = ServerMetadataCache.getInstance(config); - try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + try (Connection conn = DriverManager.getConnection(url, props)) { conn.setAutoCommit(false); - String ddl = getCreateTableStmt(tableNameStr); // Create a test table. - conn.createStatement().execute(ddl); - String indexDDLStmt = "CREATE INDEX " + indexNameStr + " ON " + tableNameStr + "(col1)"; + createTable(conn, tableNameStr, NEVER); + String indexDDLStmt = "CREATE INDEX " + indexNameStr + " ON " + tableNameStr + "(v1)"; conn.createStatement().execute(indexDDLStmt); TestUtil.waitForIndexState(conn, indexNameStr, PIndexState.ACTIVE); indexTable = PhoenixRuntime.getTableNoCache(conn, indexNameStr); @@ -422,7 +423,6 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } - /** * Test that we invalidate the cache for parent table and update the last ddl timestamp * of the parent table while we add an index. @@ -437,20 +437,14 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { String indexName = generateUniqueName(); byte[] indexNameBytes = Bytes.toBytes(indexName); ServerMetadataCache cache = ServerMetadataCache.getInstance(config); - String ddl = - "create table " + tableName + " ( k integer PRIMARY KEY," + " v1 integer," - + " v2 integer)"; - String createIndexDDL = "create index " + indexName + " on " + tableName + " (v1)"; - String dropIndexDDL = "DROP INDEX " + indexName + " ON " + tableName; - try (Connection conn = DriverManager.getConnection(getUrl()); - Statement stmt = conn.createStatement()) { + try (Connection conn = DriverManager.getConnection(getUrl())) { conn.setAutoCommit(true); - stmt.execute(ddl); + createTable(conn, tableName, NEVER); long tableLastDDLTimestampBeforeIndexCreation = getLastDDLTimestamp(tableName); // Populate the cache assertNotNull(cache.getLastDDLTimestampForTable(null, null, tableNameBytes)); Thread.sleep(1); - stmt.execute(createIndexDDL); + createIndex(conn, tableName, indexName, "v1"); // Make sure that we have invalidated the last ddl timestamp for parent table // on all regionservers after we create an index. assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, tableNameBytes)); @@ -463,7 +457,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { assertNotNull(indexLastDDLTimestampAfterCreation); // Adding a sleep for 1 ms so that we get new last ddl timestamp. Thread.sleep(1); - stmt.execute(dropIndexDDL); + dropIndex(conn, tableName, indexName); // Make sure that we invalidate the cache on regionserver for base table and an index // after we dropped an index. assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, tableNameBytes)); @@ -495,18 +489,13 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { ServerMetadataCache cache = ServerMetadataCache.getInstance(config); try(Connection conn = DriverManager.getConnection(getUrl()); Statement stmt = conn.createStatement()) { - String whereClause = " WHERE COL1 < 1000"; - String tableDDLStmt = getCreateTableStmt(tableName); - String viewDDLStmt = getCreateViewStmt(globalViewName, tableName, whereClause); - String viewIdxDDLStmt = getCreateViewIndexStmt(globalViewIndexName, globalViewName, - "COL1"); - String dropIndexDDL = "DROP INDEX " + globalViewIndexName + " ON " + globalViewName; - stmt.execute(tableDDLStmt); - stmt.execute(viewDDLStmt); + String whereClause = " WHERE v1 < 1000"; + createTable(conn, tableName, NEVER); + createViewWhereClause(conn, tableName, globalViewName, whereClause); // Populate the cache assertNotNull(cache.getLastDDLTimestampForTable(null, null, globalViewNameBytes)); long viewLastDDLTimestampBeforeIndexCreation = getLastDDLTimestamp(globalViewName); - stmt.execute(viewIdxDDLStmt); + createIndex(conn, globalViewName, globalViewIndexName, "v1"); // Make sure that we have invalidated the last ddl timestamp for parent global view // on all regionserver after we create a view index. @@ -520,7 +509,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { assertNotNull(indexLastDDLTimestampAfterCreation); // Adding a sleep for 1 ms so that we get new last ddl timestamp. Thread.sleep(1); - stmt.execute(dropIndexDDL); + dropIndex(conn, globalViewName, globalViewIndexName); // Make sure that we invalidate the cache on regionservers for view and its index after // we drop a view index. assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, @@ -536,7 +525,396 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } - public long getLastDDLTimestamp(String tableName) throws SQLException { + /** + * Client-1 creates a table, upserts data and alters the table. + * Client-2 queries the table before and after the alter. + * Check queries work successfully in both cases and verify number of addTable invocations. + */ + @Test + public void testSelectQueryWithOldDDLTimestamp() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + int expectedNumCacheUpdates; + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // create table with UCF=never and upsert data using client-1 + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName); + + // select query from client-2 works to populate client side metadata cache + // there should be 1 update to the client cache + query(conn2, tableName); + expectedNumCacheUpdates = 1; + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + + // add column using client-1 to update last ddl timestamp + alterTableAddColumn(conn1, tableName, "newCol1"); + + // reset the spy CQSI object + Mockito.reset(spyCqs2); + + // select query from client-2 with old ddl timestamp works + // there should be one update to the client cache + query(conn2, tableName); + expectedNumCacheUpdates = 1; + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + + // select query from client-2 with latest ddl timestamp works + // there should be no more updates to client cache + query(conn2, tableName); + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + } + } + + /** + * Test DDL timestamp validation retry logic in case of any exception + * from Server other than StaleMetadataCacheException. + */ + @Test + public void testSelectQueryServerSideExceptionInValidation() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + ServerMetadataCache cache = null; + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // create table and upsert using client-1 + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName); + + // Instrument ServerMetadataCache to throw a SQLException once + cache = ServerMetadataCache.getInstance(config); + ServerMetadataCache spyCache = Mockito.spy(cache); + Mockito.doThrow(new SQLException("FAIL")).doCallRealMethod().when(spyCache) + .getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName))); + ServerMetadataCache.setInstance(spyCache); + + // query using client-2 should succeed + query(conn2, tableName); + + // verify live region servers were refreshed + Mockito.verify(spyCqs2, Mockito.times(1)).refreshLiveRegionServers(); + } + } + + /** + * Test Select query works when ddl timestamp validation with old timestamp encounters an exception. + * Verify that the list of live region servers was refreshed when ddl timestamp validation is retried. + * Verify that the client cache was updated after encountering StaleMetadataCacheException. + */ + @Test + public void testSelectQueryWithOldDDLTimestampWithExceptionRetry() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + int expectedNumCacheUpdates; + ServerMetadataCache cache = null; + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // create table and upsert using client-1 + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName); + + // query using client-2 to populate cache + query(conn2, tableName); + expectedNumCacheUpdates = 1; + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + + // add column using client-1 to update last ddl timestamp + alterTableAddColumn(conn1, tableName, "newCol1"); + + // reset the spy CQSI object + Mockito.reset(spyCqs2); + + // Instrument ServerMetadataCache to throw a SQLException once + cache = ServerMetadataCache.getInstance(config); + ServerMetadataCache spyCache = Mockito.spy(cache); + Mockito.doThrow(new SQLException("FAIL")).doCallRealMethod().when(spyCache) + .getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName))); + ServerMetadataCache.setInstance(spyCache); + + // query using client-2 should succeed, one cache update + query(conn2, tableName); + expectedNumCacheUpdates = 1; + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + + // verify live region servers were refreshed + Mockito.verify(spyCqs2, Mockito.times(1)).refreshLiveRegionServers(); + } + } + + /** + * Test Select Query fails in case DDL timestamp validation throws SQLException twice. + */ + @Test + public void testSelectQueryFails() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + ServerMetadataCache cache = null; + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // create table and upsert using client-1 + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName); + + // Instrument ServerMetadataCache to throw a SQLException twice + cache = ServerMetadataCache.getInstance(config); + ServerMetadataCache spyCache = Mockito.spy(cache); + SQLException e = new SQLException("FAIL"); + Mockito.doThrow(e).when(spyCache) + .getLastDDLTimestampForTable(any(), any(), eq(Bytes.toBytes(tableName))); + ServerMetadataCache.setInstance(spyCache); + + // query using client-2 should fail + query(conn2, tableName); + Assert.fail("Query should have thrown Exception"); + } + catch (Exception e) { + Assert.assertTrue("SQLException was not thrown when last ddl timestamp validation encountered errors twice.", e instanceof SQLException); + } + } + + + /** + * Client-1 creates a table, 2 level of views on it and alters the first level view. + * Client-2 queries the second level view, verify that there were 3 cache updates in client-2, + * one each for the two views and base table. + */ + @Test + public void testSelectQueryOnView() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + int expectedNumCacheUpdates; + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // create table using client-1 + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName); + + // create 2 level of views using client-1 + String view1 = generateUniqueName(); + String view2 = generateUniqueName(); + createView(conn1, tableName, view1); + createView(conn1, view1, view2); + + // query second level view using client-2 + query(conn2, view2); + expectedNumCacheUpdates = 3; // table, view1, view2 + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + + // alter first level view using client-1 to update its last ddl timestamp + alterViewAddColumn(conn1, view1, "foo"); + + // reset the spy CQSI object + Mockito.reset(spyCqs2); + + // query second level view + query(conn2, view2); + + // verify there was a getTable RPC for the view and all its ancestors + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(view1)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(view2)), + anyLong(), anyLong()); + + // verify that the view and all its ancestors were updated in the client cache + expectedNumCacheUpdates = 3; // table, view1, view2 + Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) + .addTable(any(PTable.class), anyLong()); + } + } + + /** + * Verify queries on system tables work as we will validate last ddl timestamps for them also. + */ + @Test + public void testSelectQueryOnSystemTables() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url = QueryUtil.getConnectionUrl(props, config, "client"); + ConnectionQueryServices cqs = driver.getConnectionQueryServices(url, props); + + try (Connection conn = cqs.connect(url, props)) { + query(conn, PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME); + query(conn, PhoenixDatabaseMetaData.SYSTEM_TASK_NAME); + query(conn, PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME); + query(conn, PhoenixDatabaseMetaData.SYSTEM_LOG_NAME); + } + } + + /** + * Test query on index with stale last ddl timestamp. + * Client-1 creates a table and an index on it. Client-2 queries table to populate its cache. + * Client-1 alters a property on the index. Client-2 queries the table again. + * Verify that the second query works and the index metadata was updated in the client cache. + */ + @Test + public void testSelectQueryAfterAlterIndex() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + //client-1 creates a table and an index on it + createTable(conn1, tableName, NEVER); + createIndex(conn1, tableName, indexName, "v1"); + TestUtil.waitForIndexState(conn1, indexName, PIndexState.ACTIVE); + + //client-2 populates its cache, 1 getTable and 1 addTable call for the table + query(conn2, tableName); + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(1)) + .addTable(any(PTable.class), anyLong()); + + //client-1 updates index property + alterIndexChangeStateToRebuild(conn1, tableName, indexName); + + //client-2's query using the index should work + PhoenixStatement stmt = conn2.createStatement().unwrap(PhoenixStatement.class); + stmt.executeQuery("SELECT k FROM " + tableName + " WHERE v1=1"); + Assert.assertEquals("Query on secondary key should have used index.", indexName, stmt.getQueryPlan().getTableRef().getTable().getTableName().toString()); + + //verify client-2 cache was updated with the index's base table metadata + //this would have also updated the index metadata in its cache + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)) + .addTable(any(PTable.class), anyLong()); + + //client-2 queries again with latest metadata + //verify no more getTable/addTable calls + queryWithIndex(conn2, tableName); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)) + .addTable(any(PTable.class), anyLong()); + } + } + + /** + * Test that a client can learn about a newly created index. + * Client-1 creates a table, client-2 queries the table to populate its cache. + * Client-1 creates an index on the table. Client-2 queries the table using the index. + * Verify that client-2 uses the index for the query. + */ + @Test + public void testSelectQueryAddIndex() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + //client-1 creates table + createTable(conn1, tableName, NEVER); + + //client-2 populates its cache + query(conn2, tableName); + + //client-1 creates an index on the table + createIndex(conn1, tableName, indexName, "v1"); + TestUtil.waitForIndexState(conn1, indexName, PIndexState.ACTIVE); + + //client-2 query should be able to use this index + PhoenixStatement stmt = conn2.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery("SELECT k FROM " + tableName + " WHERE v1=1"); + Assert.assertEquals("Query on secondary key should have used index.", indexName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); + } + } + + /** + * Test that a client can learn about a dropped index. + * Client-1 creates a table and an index, client-2 queries the table to populate its cache. + * Client-1 drops the index. Client-2 queries the table with index hint. + * Verify that client-2 uses the data table for the query. + */ + @Test + public void testSelectQueryDropIndex() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + //client-1 creates table and index on it + createTable(conn1, tableName, NEVER); + createIndex(conn1, tableName, indexName, "v1"); + + //client-2 populates its cache + query(conn2, tableName); + + //client-1 drops the index + dropIndex(conn1, tableName, indexName); + + //client-2 queries should use data table and not run into table not found error even when index hint is given + PhoenixStatement stmt = conn2.createStatement().unwrap(PhoenixStatement.class); + ResultSet rs = stmt.executeQuery("SELECT /*+ INDEX(" + tableName + " " + indexName + ") */ * FROM " + tableName + " WHERE v1=1"); + Assert.assertEquals("Query should have used data table since index was dropped", tableName, stmt.getQueryPlan().getContext().getCurrentTable().getTable().getName().getString()); + } + } + + + //Helper methods + + private long getLastDDLTimestamp(String tableName) throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); // Need to use different connection than what is used for creating table or indexes. String url = QueryUtil.getConnectionUrl(props, config, "client1"); @@ -546,22 +924,57 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } + private void createTable(Connection conn, String tableName, long updateCacheFrequency) throws SQLException { + conn.createStatement().execute("CREATE TABLE " + tableName + + "(k INTEGER NOT NULL PRIMARY KEY, v1 INTEGER, v2 INTEGER)" + + (updateCacheFrequency == 0 ? "" : "UPDATE_CACHE_FREQUENCY="+updateCacheFrequency)); + } + + private void createView(Connection conn, String parentName, String viewName) throws SQLException { + conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " + parentName); + } + + private void createViewWhereClause(Connection conn, String parentName, String viewName, String whereClause) throws SQLException { + conn.createStatement().execute("CREATE VIEW " + viewName + + " AS SELECT * FROM "+ parentName + whereClause); + } + + private void createIndex(Connection conn, String tableName, String indexName, String col) throws SQLException { + conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(" + col + ")"); + } + + private void upsert(Connection conn, String tableName) throws SQLException { + conn.createStatement().execute("UPSERT INTO " + tableName + + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); + conn.commit(); + } + + private void query(Connection conn, String tableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName); + rs.next(); + } + + private void queryWithIndex(Connection conn, String tableName) throws SQLException { + ResultSet rs = conn.createStatement().executeQuery("SELECT k FROM " + tableName + " WHERE v1=1"); + rs.next(); + } + + private void alterTableAddColumn(Connection conn, String tableName, String columnName) throws SQLException { + conn.createStatement().execute("ALTER TABLE " + tableName + " ADD IF NOT EXISTS " + + columnName + " INTEGER"); + } - private String getCreateTableStmt(String tableName) { - return "CREATE TABLE " + tableName + - " (a_string varchar not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (a_string)) "; + private void alterViewAddColumn(Connection conn, String viewName, String columnName) throws SQLException { + conn.createStatement().execute("ALTER VIEW " + viewName + " ADD IF NOT EXISTS " + + columnName + " INTEGER"); } - private String getCreateViewStmt(String viewName, String fullTableName, String whereClause) { - String viewStmt = "CREATE VIEW " + viewName + - " AS SELECT * FROM "+ fullTableName + whereClause; - return viewStmt; + private void alterIndexChangeStateToRebuild(Connection conn, String tableName, String indexName) throws SQLException, InterruptedException { + conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + " REBUILD"); + TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE); } - private String getCreateViewIndexStmt(String indexName, String viewName, String indexColumn) { - String viewIndexName = - "CREATE INDEX " + indexName + " ON " + viewName + "(" + indexColumn + ")"; - return viewIndexName; + private void dropIndex(Connection conn, String tableName, String indexName) throws SQLException { + conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName); } }