This is an automated email from the ASF dual-hosted git repository. shahrs87 pushed a commit to branch PHOENIX-6883-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push: new 4499c2965a PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726) 4499c2965a is described below commit 4499c2965af9e07491bfab104b94d8be697d0e35 Author: palash <palashc...@gmail.com> AuthorDate: Wed Nov 29 23:24:30 2023 +0530 PHOENIX-7026 : Validate LAST_DDL_TIMESTAMP for write requests (#1726) --- .../org/apache/phoenix/execute/MutationState.java | 66 +++- .../org/apache/phoenix/jdbc/PhoenixStatement.java | 135 +------ .../phoenix/monitoring/GlobalClientMetrics.java | 4 +- .../org/apache/phoenix/monitoring/MetricType.java | 3 + .../phoenix/util/ValidateLastDDLTimestampUtil.java | 211 ++++++++++ .../phoenix/cache/ServerMetadataCacheTest.java | 440 ++++++++++++++++++++- 6 files changed, 708 insertions(+), 151 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 1cf23d7e0f..1612fe36a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -68,6 +68,7 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; +import org.apache.phoenix.exception.StaleMetadataCacheException; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.hbase.index.exception.IndexWriteException; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -122,6 +123,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TransactionUtil; +import org.apache.phoenix.util.ValidateLastDDLTimestampUtil; import org.apache.phoenix.util.WALAnnotationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -159,6 +161,7 @@ public class MutationState implements SQLCloseable { private long estimatedSize = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; + private boolean validateLastDdlTimestamp; private Map<TableRef, List<MultiRowMutationState>> txMutations = Collections.emptyMap(); private PhoenixTransactionContext phoenixTransactionContext = PhoenixTransactionContext.NULL_CONTEXT; @@ -221,6 +224,8 @@ public class MutationState implements SQLCloseable { boolean isMetricsEnabled = connection.isRequestLevelMetricsEnabled(); this.mutationMetricQueue = isMetricsEnabled ? new MutationMetricQueue() : NoOpMutationMetricsQueue.NO_OP_MUTATION_METRICS_QUEUE; + this.validateLastDdlTimestamp = ValidateLastDDLTimestampUtil + .getValidateLastDdlTimestampEnabled(this.connection); if (subTask) { // this code path is only used while running child scans, we can't pass the txContext to child scans // as it is not thread safe, so we use the tx member variable @@ -946,26 +951,33 @@ public class MutationState implements SQLCloseable { .setTableName(table.getTableName().getString()).build().buildException(); } } long timestamp = result.getMutationTime(); - if (timestamp != QueryConstants.UNSET_TIMESTAMP) { - serverTimeStamp = timestamp; - if (result.wasUpdated()) { - List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size()); - for (Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) { - RowMutationState valueEntry = rowEntry.getValue(); - if (valueEntry != null) { - Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); - if (colValues != PRow.DELETE_MARKER) { - for (PColumn column : colValues.keySet()) { - if (!column.isDynamic()) columns.add(column); + serverTimeStamp = timestamp; + + /* when last_ddl_timestamp validation is enabled, + we don't know if this table's cache result was force updated + during the validation, so always validate columns */ + if ((timestamp != QueryConstants.UNSET_TIMESTAMP && result.wasUpdated()) + || this.validateLastDdlTimestamp) { + List<PColumn> columns + = Lists.newArrayListWithExpectedSize(table.getColumns().size()); + for (Map.Entry<ImmutableBytesPtr, RowMutationState> + rowEntry : rowKeyToColumnMap.entrySet()) { + RowMutationState valueEntry = rowEntry.getValue(); + if (valueEntry != null) { + Map<PColumn, byte[]> colValues = valueEntry.getColumnValues(); + if (colValues != PRow.DELETE_MARKER) { + for (PColumn column : colValues.keySet()) { + if (!column.isDynamic()) { + columns.add(column); } } } } - for (PColumn column : columns) { - if (column != null) { - resolvedTable.getColumnFamily(column.getFamilyName().getString()).getPColumnForColumnName( - column.getName().getString()); - } + } + for (PColumn column : columns) { + if (column != null) { + resolvedTable.getColumnFamily(column.getFamilyName().getString()) + .getPColumnForColumnName(column.getName().getString()); } } } @@ -1201,6 +1213,28 @@ public class MutationState implements SQLCloseable { commitBatches = createCommitBatches(tableRefIterator); } + //if enabled, validate last ddl timestamps for all tables in the mutationsMap + //for now, force update client cache for all tables if StaleMetadataCacheException is seen + if (this.validateLastDdlTimestamp) { + List<TableRef> tableRefs = new ArrayList<>(this.mutationsMap.keySet()); + try { + ValidateLastDDLTimestampUtil.validateLastDDLTimestamp( + connection, tableRefs, true, true); + } catch (StaleMetadataCacheException e) { + GlobalClientMetrics + .GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.increment(); + MetaDataClient mc = new MetaDataClient(connection); + PName tenantId = connection.getTenantId(); + LOGGER.debug("Force updating client metadata cache for {}", + ValidateLastDDLTimestampUtil.getInfoString(tenantId, tableRefs)); + for (TableRef tableRef : tableRefs) { + String schemaName = tableRef.getTable().getSchemaName().toString(); + String tableName = tableRef.getTable().getTableName().toString(); + mc.updateCache(tenantId, schemaName, tableName, true); + } + } + } + for (Map<TableRef, MultiRowMutationState> commitBatch : commitBatches) { long [] serverTimestamps = validateServerTimestamps ? validateAll(commitBatch) : null; sendBatch(commitBatch, serverTimestamps, sendAll); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java index d68e981d10..41981c41c2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java @@ -64,20 +64,14 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Consistency; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.call.CallRunner; import org.apache.phoenix.compile.BaseMutationPlan; @@ -107,8 +101,6 @@ import org.apache.phoenix.compile.StatementPlan; import org.apache.phoenix.compile.TraceQueryPlan; import org.apache.phoenix.compile.UpsertCompiler; import org.apache.phoenix.coprocessor.MetaDataProtocol; -import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint; -import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.exception.StaleMetadataCacheException; @@ -128,6 +120,7 @@ import org.apache.phoenix.log.QueryLogInfo; import org.apache.phoenix.log.QueryLogger; import org.apache.phoenix.log.QueryLoggerUtil; import org.apache.phoenix.log.QueryStatus; +import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.monitoring.TableMetricsManager; import org.apache.phoenix.optimize.Cost; import org.apache.phoenix.parse.AddColumnStatement; @@ -206,12 +199,10 @@ import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PNameFactory; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable.IndexType; -import org.apache.phoenix.schema.PTableKey; import org.apache.phoenix.schema.PTableType; import org.apache.phoenix.schema.RowKeyValueAccessor; import org.apache.phoenix.schema.Sequence; import org.apache.phoenix.schema.SortOrder; -import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.TableRef; import org.apache.phoenix.schema.stats.StatisticsCollectionScope; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; @@ -233,6 +224,7 @@ import org.apache.phoenix.util.SQLCloseable; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ServerUtil; import org.apache.phoenix.util.ParseNodeUtil.RewriteResult; +import org.apache.phoenix.util.ValidateLastDDLTimestampUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -308,7 +300,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable public PhoenixStatement(PhoenixConnection connection) { this.connection = connection; this.queryTimeoutMillis = getDefaultQueryTimeoutMillis(); - this.validateLastDdlTimestamp = getValidateLastDdlTimestampEnabled(); + this.validateLastDdlTimestamp = ValidateLastDDLTimestampUtil + .getValidateLastDdlTimestampEnabled(this.connection); } /** @@ -320,12 +313,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable QueryServicesOptions.DEFAULT_THREAD_TIMEOUT_MS); } - private boolean getValidateLastDdlTimestampEnabled() { - return connection.getQueryServices().getProps() - .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, - QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED); - } - protected List<PhoenixResultSet> getResultSets() { return resultSets; } @@ -349,109 +336,6 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable return executeQuery(stmt, true, queryLogger, noCommit, this.validateLastDdlTimestamp); } - private String getInfoString(TableRef tableRef) { - return String.format("Tenant: %s, Schema: %s, Table: %s", - this.connection.getTenantId(), - tableRef.getTable().getSchemaName(), - tableRef.getTable().getTableName()); - } - - private void setLastDDLTimestampRequestParameters( - RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder, PTable pTable) { - byte[] tenantIDBytes = this.connection.getTenantId() == null - ? HConstants.EMPTY_BYTE_ARRAY - : this.connection.getTenantId().getBytes(); - byte[] schemaBytes = pTable.getSchemaName() == null - ? HConstants.EMPTY_BYTE_ARRAY - : pTable.getSchemaName().getBytes(); - builder.setTenantId(ByteStringer.wrap(tenantIDBytes)); - builder.setSchemaName(ByteStringer.wrap(schemaBytes)); - builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes())); - builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp()); - } - /** - * Build a request for the validateLastDDLTimestamp RPC. - * @param tableRef - * @return ValidateLastDDLTimestampRequest for the table in tableRef - */ - private RegionServerEndpointProtos.ValidateLastDDLTimestampRequest - getValidateDDLTimestampRequest(TableRef tableRef) throws TableNotFoundException { - RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder - = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder(); - RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder - = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); - - //when querying an index, we need to validate its parent table in case the index was dropped - if (PTableType.INDEX.equals(tableRef.getTable().getType())) { - PTableKey key = new PTableKey(this.connection.getTenantId(), - tableRef.getTable().getParentName().getString()); - PTable parentTable = this.connection.getTable(key); - setLastDDLTimestampRequestParameters(innerBuilder, parentTable); - requestBuilder.addLastDDLTimestampRequests(innerBuilder); - } - - innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); - setLastDDLTimestampRequestParameters(innerBuilder, tableRef.getTable()); - requestBuilder.addLastDDLTimestampRequests(innerBuilder); - - //when querying a view, we need to validate last ddl timestamps for all its ancestors - if (PTableType.VIEW.equals(tableRef.getTable().getType())) { - PTable pTable = tableRef.getTable(); - while (pTable.getParentName() != null) { - PTableKey key = new PTableKey(this.connection.getTenantId(), - pTable.getParentName().getString()); - PTable parentTable = this.connection.getTable(key); - innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); - setLastDDLTimestampRequestParameters(innerBuilder, parentTable); - requestBuilder.addLastDDLTimestampRequests(innerBuilder); - pTable = parentTable; - } - } - return requestBuilder.build(); - } - - /** - * Verifies that table metadata in client cache is up-to-date with server. - * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp. - * Retry once if there was an error performing the RPC, otherwise throw the Exception. - * @param tableRef - * @throws SQLException - */ - private void validateLastDDLTimestamp(TableRef tableRef, boolean doRetry) throws SQLException { - - String infoString = getInfoString(tableRef); - try (Admin admin = this.connection.getQueryServices().getAdmin()) { - // get all live region servers - List<ServerName> regionServers - = this.connection.getQueryServices().getLiveRegionServers(); - // pick one at random - ServerName regionServer - = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size())); - - LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}", - infoString, regionServer); - - // RPC - CoprocessorRpcChannel channel = admin.coprocessorService(regionServer); - PhoenixRegionServerEndpoint.BlockingInterface service - = PhoenixRegionServerEndpoint.newBlockingStub(channel); - service.validateLastDDLTimestamp(null, getValidateDDLTimestampRequest(tableRef)); - } catch (Exception e) { - SQLException parsedException = ServerUtil.parseServerException(e); - if (parsedException instanceof StaleMetadataCacheException) { - throw parsedException; - } - //retry once for any exceptions other than StaleMetadataCacheException - LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException); - if (doRetry) { - // update the list of live region servers - this.connection.getQueryServices().refreshLiveRegionServers(); - validateLastDDLTimestamp(tableRef, false); - return; - } - throw parsedException; - } - } private PhoenixResultSet executeQuery(final CompilableStatement stmt, final boolean doRetryOnMetaNotFoundError, @@ -508,7 +392,8 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable //verify metadata for the table/view/index in the query plan //plan.getTableRef can be null in some cases like EXPLAIN <query> if (shouldValidateLastDdlTimestamp && plan.getTableRef() != null) { - validateLastDDLTimestamp(plan.getTableRef(), true); + ValidateLastDDLTimestampUtil.validateLastDDLTimestamp( + connection, Arrays.asList(plan.getTableRef()), false, true); } // this will create its own trace internally, so we don't wrap this @@ -564,13 +449,17 @@ public class PhoenixStatement implements PhoenixMonitoredStatement, SQLCloseable } throw e; } catch (StaleMetadataCacheException e) { + GlobalClientMetrics + .GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER + .increment(); updateMetrics = false; PTable pTable = lastQueryPlan.getTableRef().getTable(); - LOGGER.debug("Force updating client metadata cache for {}", - getInfoString(getLastQueryPlan().getTableRef())); String schemaN = pTable.getSchemaName().toString(); String tableN = pTable.getTableName().toString(); PName tenantId = connection.getTenantId(); + LOGGER.debug("Force updating client metadata cache for {}", + ValidateLastDDLTimestampUtil.getInfoString(tenantId, + Arrays.asList(getLastQueryPlan().getTableRef()))); // if the index metadata was stale, we will update the client cache // for the parent table, which will also add the new index metadata diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java index 70529dc36c..8763a7c5ec 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/GlobalClientMetrics.java @@ -39,6 +39,7 @@ import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES; import static org.apache.phoenix.monitoring.MetricType.SELECT_SQL_COUNTER; import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_COUNTER; import static org.apache.phoenix.monitoring.MetricType.SPOOL_FILE_SIZE; +import static org.apache.phoenix.monitoring.MetricType.STALE_METADATA_CACHE_EXCEPTION_COUNTER; import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER; import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTION_TIME; @@ -159,7 +160,8 @@ public enum GlobalClientMetrics { GLOBAL_HA_PARALLEL_CONNECTION_CREATED_COUNTER(HA_PARALLEL_CONNECTION_CREATED_COUNTER), GLOBAL_CLIENT_METADATA_CACHE_MISS_COUNTER(CLIENT_METADATA_CACHE_MISS_COUNTER), - GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER); + GLOBAL_CLIENT_METADATA_CACHE_HIT_COUNTER(CLIENT_METADATA_CACHE_HIT_COUNTER), + GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER(STALE_METADATA_CACHE_EXCEPTION_COUNTER); private static final Logger LOGGER = LoggerFactory.getLogger(GlobalClientMetrics.class); private static final boolean isGlobalMetricsEnabled = QueryServicesOptions.withDefaults().isGlobalMetricsEnabled(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java index acdba9e551..b7b0c4b562 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/monitoring/MetricType.java @@ -150,6 +150,9 @@ public enum MetricType { CLIENT_METADATA_CACHE_MISS_COUNTER("cmcm", "Number of cache misses for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE), CLIENT_METADATA_CACHE_HIT_COUNTER("cmch", "Number of cache hits for the CQSI cache.", LogLevel.DEBUG, PLong.INSTANCE), PAGED_ROWS_COUNTER("prc", "Number of dummy rows returned to client due to paging.", LogLevel.DEBUG, PLong.INSTANCE), + STALE_METADATA_CACHE_EXCEPTION_COUNTER("smce", + "Number of StaleMetadataCacheException encountered.", + LogLevel.DEBUG, PLong.INSTANCE), // hbase metrics COUNT_RPC_CALLS("rp", "Number of RPC calls",LogLevel.DEBUG, PLong.INSTANCE), diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java new file mode 100644 index 0000000000..47159fccca --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ValidateLastDDLTimestampUtil.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util; + +import java.sql.SQLException; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.phoenix.coprocessor.PhoenixRegionServerEndpoint; +import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; +import org.apache.phoenix.exception.StaleMetadataCacheException; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.TableNotFoundException; +import org.apache.phoenix.schema.TableRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for last ddl timestamp validation from the client. + */ +public class ValidateLastDDLTimestampUtil { + + private ValidateLastDDLTimestampUtil() {} + + private static final Logger LOGGER = LoggerFactory + .getLogger(ValidateLastDDLTimestampUtil.class); + + public static String getInfoString(PName tenantId, List<TableRef> tableRefs) { + StringBuilder sb = new StringBuilder(); + sb.append(String.format("Tenant: %s, ", tenantId)); + for (TableRef tableRef : tableRefs) { + sb.append(String.format("{Schema: %s, Table: %s},", + tableRef.getTable().getSchemaName(), + tableRef.getTable().getTableName())); + } + return sb.toString(); + } + + /** + * Get whether last ddl timestamp validation is enabled on the connection + * @param connection + * @return true if it is enabled, false otherwise + */ + public static boolean getValidateLastDdlTimestampEnabled(PhoenixConnection connection) { + return connection.getQueryServices().getProps() + .getBoolean(QueryServices.LAST_DDL_TIMESTAMP_VALIDATION_ENABLED, + QueryServicesOptions.DEFAULT_LAST_DDL_TIMESTAMP_VALIDATION_ENABLED); + } + + /** + * Verifies that table metadata for given tables is up-to-date in client cache with server. + * A random live region server is picked for invoking the RPC to validate LastDDLTimestamp. + * Retry once if there was an error performing the RPC, otherwise throw the Exception. + * @param tableRefs + * @param isWritePath + * @param doRetry + * @throws SQLException + */ + public static void validateLastDDLTimestamp( + PhoenixConnection conn, List<TableRef> tableRefs, boolean isWritePath, boolean doRetry) + throws SQLException { + + String infoString = getInfoString(conn.getTenantId(), tableRefs); + try (Admin admin = conn.getQueryServices().getAdmin()) { + // get all live region servers + List<ServerName> regionServers + = conn.getQueryServices().getLiveRegionServers(); + // pick one at random + ServerName regionServer + = regionServers.get(ThreadLocalRandom.current().nextInt(regionServers.size())); + + LOGGER.debug("Sending DDL timestamp validation request for {} to regionserver {}", + infoString, regionServer); + + // RPC + CoprocessorRpcChannel channel = admin.coprocessorService(regionServer); + PhoenixRegionServerEndpoint.BlockingInterface service + = PhoenixRegionServerEndpoint.newBlockingStub(channel); + RegionServerEndpointProtos.ValidateLastDDLTimestampRequest request + = getValidateDDLTimestampRequest(conn, tableRefs, isWritePath); + service.validateLastDDLTimestamp(null, request); + } catch (Exception e) { + SQLException parsedException = ServerUtil.parseServerException(e); + if (parsedException instanceof StaleMetadataCacheException) { + throw parsedException; + } + //retry once for any exceptions other than StaleMetadataCacheException + LOGGER.error("Error in validating DDL timestamp for {}", infoString, parsedException); + if (doRetry) { + // update the list of live region servers + conn.getQueryServices().refreshLiveRegionServers(); + validateLastDDLTimestamp(conn, tableRefs, isWritePath, false); + return; + } + throw parsedException; + } + } + + /** + * Build a request for the validateLastDDLTimestamp RPC for the given tables. + * 1. For a view, we need to add all its ancestors to the request + * in case something changed in the hierarchy. + * 2. For an index, we need to add its parent table to the request + * in case the index was dropped. + * 3. On the write path, we need to add all indexes of a table/view + * in case index state was changed. + * @param conn + * @param tableRefs + * @param isWritePath + * @return ValidateLastDDLTimestampRequest for the table in tableRef + */ + private static RegionServerEndpointProtos.ValidateLastDDLTimestampRequest + getValidateDDLTimestampRequest(PhoenixConnection conn, List<TableRef> tableRefs, + boolean isWritePath) throws TableNotFoundException { + + RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.Builder requestBuilder + = RegionServerEndpointProtos.ValidateLastDDLTimestampRequest.newBuilder(); + RegionServerEndpointProtos.LastDDLTimestampRequest.Builder innerBuilder; + + for (TableRef tableRef : tableRefs) { + + //when querying an index, we need to validate its parent table + //in case the index was dropped + if (PTableType.INDEX.equals(tableRef.getTable().getType())) { + innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + PTableKey key = new PTableKey(conn.getTenantId(), + tableRef.getTable().getParentName().getString()); + PTable parentTable = conn.getTable(key); + setLastDDLTimestampRequestParameters(innerBuilder, conn.getTenantId(), parentTable); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + } + + // add the tableRef to the request + innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + setLastDDLTimestampRequestParameters( + innerBuilder, conn.getTenantId(), tableRef.getTable()); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + + //when querying a view, we need to validate last ddl timestamps for all its ancestors + if (PTableType.VIEW.equals(tableRef.getTable().getType())) { + PTable pTable = tableRef.getTable(); + while (pTable.getParentName() != null) { + PTableKey key = new PTableKey(conn.getTenantId(), + pTable.getParentName().getString()); + PTable parentTable = conn.getTable(key); + innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + setLastDDLTimestampRequestParameters( + innerBuilder, conn.getTenantId(), parentTable); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + pTable = parentTable; + } + } + + //on the write path, we need to validate all indexes of a table/view + //in case index state was changed + if (isWritePath) { + for (PTable idxPTable : tableRef.getTable().getIndexes()) { + innerBuilder = RegionServerEndpointProtos.LastDDLTimestampRequest.newBuilder(); + setLastDDLTimestampRequestParameters( + innerBuilder, conn.getTenantId(), idxPTable); + requestBuilder.addLastDDLTimestampRequests(innerBuilder); + } + } + } + return requestBuilder.build(); + } + + /** + * For the given PTable, set the attributes on the LastDDLTimestampRequest. + */ + private static void setLastDDLTimestampRequestParameters( + RegionServerEndpointProtos.LastDDLTimestampRequest.Builder builder, + PName tenantId, PTable pTable) { + byte[] tenantIDBytes = tenantId == null + ? HConstants.EMPTY_BYTE_ARRAY + : tenantId.getBytes(); + byte[] schemaBytes = pTable.getSchemaName() == null + ? HConstants.EMPTY_BYTE_ARRAY + : pTable.getSchemaName().getBytes(); + builder.setTenantId(ByteStringer.wrap(tenantIDBytes)); + builder.setSchemaName(ByteStringer.wrap(schemaBytes)); + builder.setTableName(ByteStringer.wrap(pTable.getTableName().getBytes())); + builder.setLastDDLTimestamp(pTable.getLastDDLTimestamp()); + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java index 2a618cdcce..6ae19ad144 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java @@ -21,11 +21,14 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.monitoring.GlobalClientMetrics; import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ConnectionProperty; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.TableNotFoundException; import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.PhoenixRuntime; @@ -36,6 +39,7 @@ import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -77,6 +81,11 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); } + @Before + public void resetMetrics() { + GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().reset(); + } + @After public void resetMetadataCache() { ServerMetadataCache.resetCache(); @@ -545,7 +554,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { // create table with UCF=never and upsert data using client-1 createTable(conn1, tableName, NEVER); - upsert(conn1, tableName); + upsert(conn1, tableName, true); // select query from client-2 works to populate client side metadata cache // there should be 1 update to the client cache @@ -562,16 +571,22 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { // select query from client-2 with old ddl timestamp works // there should be one update to the client cache + //verify client got a StaleMetadataCacheException query(conn2, tableName); expectedNumCacheUpdates = 1; Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) .addTable(any(PTable.class), anyLong()); + Assert.assertEquals("Client should have encountered a StaleMetadataCacheException", + 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue()); // select query from client-2 with latest ddl timestamp works // there should be no more updates to client cache + //verify client did not get another StaleMetadataCacheException query(conn2, tableName); Mockito.verify(spyCqs2, Mockito.times(expectedNumCacheUpdates)) .addTable(any(PTable.class), anyLong()); + Assert.assertEquals("Client should have encountered a StaleMetadataCacheException", + 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue()); } } @@ -594,7 +609,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { // create table and upsert using client-1 createTable(conn1, tableName, NEVER); - upsert(conn1, tableName); + upsert(conn1, tableName, true); // Instrument ServerMetadataCache to throw a SQLException once cache = ServerMetadataCache.getInstance(config); @@ -632,7 +647,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { // create table and upsert using client-1 createTable(conn1, tableName, NEVER); - upsert(conn1, tableName); + upsert(conn1, tableName, true); // query using client-2 to populate cache query(conn2, tableName); @@ -682,7 +697,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { // create table and upsert using client-1 createTable(conn1, tableName, NEVER); - upsert(conn1, tableName); + upsert(conn1, tableName, true); // Instrument ServerMetadataCache to throw a SQLException twice cache = ServerMetadataCache.getInstance(config); @@ -722,7 +737,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { // create table using client-1 createTable(conn1, tableName, NEVER); - upsert(conn1, tableName); + upsert(conn1, tableName, true); // create 2 level of views using client-1 String view1 = generateUniqueName(); @@ -813,7 +828,7 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { .addTable(any(PTable.class), anyLong()); //client-1 updates index property - alterIndexChangeStateToRebuild(conn1, tableName, indexName); + alterIndexChangeState(conn1, tableName, indexName, " REBUILD"); //client-2's query using the index should work PhoenixStatement stmt = conn2.createStatement().unwrap(PhoenixStatement.class); @@ -911,6 +926,382 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } + /** + * Test the case when a client upserts into multiple tables before calling commit. + * Verify that last ddl timestamp was validated for all involved tables only once. + */ + @Test + public void testUpsertMultipleTablesWithOldDDLTimestamp() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName1 = generateUniqueName(); + String tableName2 = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + //client-1 creates 2 tables + createTable(conn1, tableName1, NEVER); + createTable(conn1, tableName2, NEVER); + + //client-2 populates its cache, 1 getTable call for each table + query(conn2, tableName1); + query(conn2, tableName2); + + //client-1 alters one of the tables + alterTableAddColumn(conn1, tableName2, "col3"); + + //client-2 upserts multiple rows to both tables before calling commit + //verify the table metadata was fetched for each table + multiTableUpsert(conn2, tableName1, tableName2); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName1)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName2)), + anyLong(), anyLong()); + } + } + + /** + * Test upserts into a multi-level view hierarchy. + */ + @Test + public void testUpsertViewWithOldDDLTimestamp() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String viewName1 = generateUniqueName(); + String viewName2 = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + //client-1 creates a table and views + createTable(conn1, tableName, NEVER); + createView(conn1, tableName, viewName1); + createView(conn1, viewName1, viewName2); + + //client-2 populates its cache, 1 getTable RPC each for table, view1, view2 + query(conn2, viewName2); + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(1)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)), + anyLong(), anyLong()); + + //client-1 alters first level view + alterViewAddColumn(conn1, viewName1, "col3"); + + //client-2 upserts into second level view + //verify there was a getTable RPC for the view and all its ancestors + //verify that the client got a StaleMetadataCacheException + upsert(conn2, viewName2, true); + + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)), + anyLong(), anyLong()); + Assert.assertEquals("Client should have encountered a StaleMetadataCacheException", + 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue()); + //client-2 upserts into first level view + //verify no getTable RPCs + //verify that the client did not get a StaleMetadataCacheException + upsert(conn2, viewName1, true); + + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(tableName)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName1)), + anyLong(), anyLong()); + Mockito.verify(spyCqs2, Mockito.times(2)).getTable(eq(null), + any(byte[].class), eq(PVarchar.INSTANCE.toBytes(viewName2)), + anyLong(), anyLong()); + Assert.assertEquals("Client should not have encountered another StaleMetadataCacheException", + 1, GlobalClientMetrics.GLOBAL_CLIENT_STALE_METADATA_CACHE_EXCEPTION_COUNTER.getMetric().getValue()); + } + } + + /** + * Test that upserts into a table which was dropped throws a TableNotFoundException. + */ + @Test + public void testUpsertDroppedTable() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // client-1 creates tables and executes upserts + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + + // client-2 drops the table + conn2.createStatement().execute("DROP TABLE " + tableName); + + //client-1 commits + conn1.commit(); + Assert.fail("Commit should have failed with TableNotFoundException"); + } + catch (Exception e) { + Assert.assertTrue("TableNotFoundException was not thrown when table was dropped concurrently with upserts.", e instanceof TableNotFoundException); + } + } + + /** + * Client-1 creates a table and executes some upserts. + * Client-2 drops a column for which client-1 had executed upserts. + * Client-1 calls commit. Verify that client-1 gets ColumnNotFoundException + */ + @Test + public void testUpsertDroppedTableColumn() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // client-1 creates tables and executes upserts + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + + // client-2 drops a column + alterTableDropColumn(conn2, tableName, "v1"); + + //client-1 commits + conn1.commit(); + Assert.fail("Commit should have failed with ColumnNotFoundException"); + } + catch (Exception e) { + Assert.assertTrue("ColumnNotFoundException was not thrown when column was dropped concurrently with upserts.", e instanceof ColumnNotFoundException); + } + } + + /** + * Client-1 creates a table and executes some upserts. + * Client-2 adds a column to the table. + * Client-1 calls commit. Verify that client-1 does not get any errors. + */ + @Test + public void testUpsertAddTableColumn() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // client-1 creates tables and executes upserts + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + + // client-2 adds a column + alterTableAddColumn(conn2, tableName, "v5"); + + //client-1 commits + conn1.commit(); + } + } + + /** + * Client-1 creates a table and executes some upserts. + * Client-2 creates an index on the table. + * Client-1 calls commit. Verify that index mutations were correctly generated + */ + @Test + public void testConcurrentUpsertIndexCreation() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // client-1 creates tables and executes upserts + createTable(conn1, tableName, NEVER); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + + // client-2 creates an index + createIndex(conn2, tableName, indexName, "v1"); + + //client-1 commits + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + conn1.commit(); + + //verify index rows + int tableCount, indexCount; + ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName); + rs.next(); + tableCount = rs.getInt(1); + + rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName); + rs.next(); + indexCount = rs.getInt(1); + + Assert.assertEquals("All index mutations were not generated when index was created concurrently with upserts.", tableCount, indexCount); + } + } + + /** + * Client-1 creates a table, index and executes some upserts. + * Client-2 drops the index on the table. + * Client-1 calls commit. Verify that client-1 does not see any errors + */ + @Test + public void testConcurrentUpsertDropIndex() throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // client-1 creates tables, index and executes upserts + createTable(conn1, tableName, NEVER); + createIndex(conn1, tableName, indexName, "v1"); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + + // client-2 drops the index + dropIndex(conn2, tableName, indexName); + + //client-1 commits + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + conn1.commit(); + } + } + /** + * Client-1 creates a table, index in disabled state and executes some upserts. + * Client-2 marks the index as Rebuild. + * Client-1 calls commit. Verify that index mutations were correctly generated + */ + @Test + public void testConcurrentUpsertIndexStateChange() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String indexName = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + // client-1 creates tables and executes upserts + createTable(conn1, tableName, NEVER); + createIndex(conn1, tableName, indexName, "v1"); + alterIndexChangeState(conn1, tableName, indexName, " DISABLE"); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + + // client-2 creates an index + alterIndexChangeState(conn2, tableName, indexName, " REBUILD"); + + //client-1 commits + upsert(conn1, tableName, false); + upsert(conn1, tableName, false); + conn1.commit(); + + //verify index rows + int tableCount, indexCount; + ResultSet rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + tableName); + rs.next(); + tableCount = rs.getInt(1); + + rs = conn1.createStatement().executeQuery("SELECT COUNT(*) FROM " + indexName); + rs.next(); + indexCount = rs.getInt(1); + + Assert.assertEquals("All index mutations were not generated when index was created concurrently with upserts.", tableCount, indexCount); + } + } + + /** + * Test that upserts into a view whose parent was dropped throws a TableNotFoundException. + */ + @Test + public void testConcurrentUpsertDropView() throws Exception { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + String url1 = QueryUtil.getConnectionUrl(props, config, "client1"); + String url2 = QueryUtil.getConnectionUrl(props, config, "client2"); + String tableName = generateUniqueName(); + String viewName1 = generateUniqueName(); + String viewName2 = generateUniqueName(); + ConnectionQueryServices spyCqs1 = Mockito.spy(driver.getConnectionQueryServices(url1, props)); + ConnectionQueryServices spyCqs2 = Mockito.spy(driver.getConnectionQueryServices(url2, props)); + + try (Connection conn1 = spyCqs1.connect(url1, props); + Connection conn2 = spyCqs2.connect(url2, props)) { + + //client-1 creates tables and views + createTable(conn1, tableName, NEVER); + createView(conn1, tableName, viewName1); + createView(conn1, viewName1, viewName2); + + //client-2 upserts into second level view + upsert(conn2, viewName2, false); + + //client-1 drop first level view + dropView(conn1, viewName1, true); + + //client-2 upserts into second level view and commits + upsert(conn2, viewName2, true); + } + catch (Exception e) { + Assert.assertTrue("TableNotFoundException was not thrown when parent view " + + "was dropped (cascade) concurrently with upserts.", + e instanceof TableNotFoundException); + } + } //Helper methods @@ -943,10 +1334,12 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { conn.createStatement().execute("CREATE INDEX " + indexName + " ON " + tableName + "(" + col + ")"); } - private void upsert(Connection conn, String tableName) throws SQLException { + private void upsert(Connection conn, String tableName, boolean doCommit) throws SQLException { conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); - conn.commit(); + if (doCommit) { + conn.commit(); + } } private void query(Connection conn, String tableName) throws SQLException { @@ -964,17 +1357,42 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { + columnName + " INTEGER"); } + private void alterTableDropColumn(Connection conn, String tableName, String columnName) throws SQLException { + conn.createStatement().execute("ALTER TABLE " + tableName + " DROP COLUMN " + columnName); + } + private void alterViewAddColumn(Connection conn, String viewName, String columnName) throws SQLException { conn.createStatement().execute("ALTER VIEW " + viewName + " ADD IF NOT EXISTS " + columnName + " INTEGER"); } - private void alterIndexChangeStateToRebuild(Connection conn, String tableName, String indexName) throws SQLException, InterruptedException { - conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + " REBUILD"); - TestUtil.waitForIndexState(conn, indexName, PIndexState.ACTIVE); + private void alterIndexChangeState(Connection conn, String tableName, String indexName, String state) throws SQLException, InterruptedException { + conn.createStatement().execute("ALTER INDEX " + indexName + " ON " + tableName + state); } private void dropIndex(Connection conn, String tableName, String indexName) throws SQLException { conn.createStatement().execute("DROP INDEX " + indexName + " ON " + tableName); } + + private void dropView(Connection conn, String viewName, boolean cascade) throws SQLException { + String sql = "DROP VIEW " + viewName; + if (cascade) { + sql += " CASCADE"; + } + conn.createStatement().execute(sql); + } + + private void multiTableUpsert(Connection conn, String tableName1, String tableName2) throws SQLException { + conn.createStatement().execute("UPSERT INTO " + tableName1 + + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); + conn.createStatement().execute("UPSERT INTO " + tableName1 + + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); + conn.createStatement().execute("UPSERT INTO " + tableName2 + + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); + conn.createStatement().execute("UPSERT INTO " + tableName1 + + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); + conn.createStatement().execute("UPSERT INTO " + tableName2 + + " (k, v1, v2) VALUES ("+ RANDOM.nextInt() +", " + RANDOM.nextInt() + ", " + RANDOM.nextInt() +")"); + conn.commit(); + } }