This is an automated email from the ASF dual-hosted git repository. shahrs87 pushed a commit to branch PHOENIX-6883-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push: new 0cf4cb8cd9 PHOENIX-7056 Update the LAST_DDL_TIMESTAMP of the parent table/view when we add/drop an index. (#1702) 0cf4cb8cd9 is described below commit 0cf4cb8cd9da98acc553860b4e92735a4e4bc91e Author: Rushabh Shah <shahr...@apache.org> AuthorDate: Thu Oct 12 09:45:54 2023 -0700 PHOENIX-7056 Update the LAST_DDL_TIMESTAMP of the parent table/view when we add/drop an index. (#1702) --- .../InvalidateServerMetadataCacheRequest.java | 53 ++++++++ .../phoenix/coprocessor/MetaDataEndpointImpl.java | 121 +++++++++++-------- .../phoenix/cache/ServerMetadataCacheTest.java | 133 +++++++++++++++++++++ 3 files changed, 258 insertions(+), 49 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/InvalidateServerMetadataCacheRequest.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/InvalidateServerMetadataCacheRequest.java new file mode 100644 index 0000000000..93da9ec757 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/InvalidateServerMetadataCacheRequest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.coprocessor; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.util.SchemaUtil; + +public class InvalidateServerMetadataCacheRequest { + private final byte[] tenantId; + private final byte[] schemaName; + private final byte[] tableName; + + public InvalidateServerMetadataCacheRequest(byte[] tenantId, byte[] schemaName, + byte[] tableName) { + this.tenantId = tenantId; + this.schemaName = schemaName; + this.tableName = tableName; + } + + public byte[] getTenantId() { + return tenantId; + } + + public byte[] getSchemaName() { + return schemaName; + } + + public byte[] getTableName() { + return tableName; + } + + @Override + public String toString() { + String fullTableName = SchemaUtil.getTableName(schemaName, tableName); + return "tenantId = " + Bytes.toString(tenantId) + + ", table name = " + fullTableName; + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index 716bee220f..e69bb147ea 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -2417,6 +2417,18 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // table/index/views. tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(tableKey, clientTimeStamp, EnvironmentEdgeManager.currentTimeMillis())); + if (tableType == INDEX) { + // Invalidate the cache on each regionserver for parent table/view. + List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>(); + requests.add(new InvalidateServerMetadataCacheRequest(tenantIdBytes, + parentSchemaName, parentTableName)); + invalidateServerMetadataCache(requests); + long currentTimestamp = EnvironmentEdgeManager.currentTimeMillis(); + // If table type is index, then update the last ddl timestamp of the parent + // table or immediate parent view. + tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(parentTableKey, + currentTimestamp, currentTimestamp)); + } //and if we're doing change detection on this table or view, notify the //external schema registry and get its schema id @@ -2771,8 +2783,20 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); if (parentLockKey != null) { acquireLock(region, parentLockKey, locks); } - invalidateServerMetadataCache(tenantIdBytes, schemaName, tableOrViewName); - List<ImmutableBytesPtr> invalidateList = new ArrayList<ImmutableBytesPtr>(); + List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>(); + requests.add(new InvalidateServerMetadataCacheRequest(tenantIdBytes, schemaName, + tableOrViewName)); + if (pTableType == INDEX) { + requests.add(new InvalidateServerMetadataCacheRequest(tenantIdBytes, schemaName, + parentTableName)); + long currentTimestamp = EnvironmentEdgeManager.currentTimeMillis(); + // If table type is index, then update the last ddl timestamp of the parent + // table or immediate parent view. + tableMetadata.add(MetaDataUtil.getLastDDLTimestampUpdate(parentLockKey, + currentTimestamp, currentTimestamp)); + } + invalidateServerMetadataCache(requests); + List<ImmutableBytesPtr> invalidateList = new ArrayList<>(); result = doDropTable(lockKey, tenantIdBytes, schemaName, tableOrViewName, parentTableName, PTableType.fromSerializedValue(tableType), tableMetadata, childLinkMutations, invalidateList, tableNamesToDelete, @@ -3187,7 +3211,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // We take a write row lock for tenantId, schemaName, tableOrViewName acquireLock(region, key, locks); // Invalidate the cache from all the regionservers. - invalidateServerMetadataCache(tenantId, schemaName, tableOrViewName); + List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>(); + requests.add(new InvalidateServerMetadataCacheRequest(tenantId, schemaName, + tableOrViewName)); + invalidateServerMetadataCache(requests); ImmutableBytesPtr cacheKey = new ImmutableBytesPtr(key); List<ImmutableBytesPtr> invalidateList = new ArrayList<>(); invalidateList.add(cacheKey); @@ -3438,23 +3465,19 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } /** - * Invalidate metadata cache from all region servers for the given tenant and table name. - * @param tenantId - * @param schemaName - * @param tableOrViewName + * Invalidate metadata cache from all region servers for the given list of + * InvalidateServerMetadataCacheRequest. * @throws Throwable */ - private void invalidateServerMetadataCache(byte[] tenantId, byte[]schemaName, - byte[] tableOrViewName) throws Throwable { + private void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests) + throws Throwable { Configuration conf = env.getConfiguration(); String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY); if (value == null || !value.contains(PhoenixRegionServerEndpoint.class.getName())) { // PhoenixRegionServerEndpoint is not loaded. We don't have to invalidate the cache. - LOGGER.info("Skip invalidating server metadata cache for tenantID: {}," - + " schema name: {}, table Name: {} since PhoenixRegionServerEndpoint" - + " is not loaded", Bytes.toString(tenantId), - Bytes.toString(schemaName), Bytes.toString(tableOrViewName)); + LOGGER.info("Skip invalidating server metadata cache since PhoenixRegionServerEndpoint" + + " is not loaded"); return; } Properties properties = new Properties(); @@ -3467,33 +3490,28 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // This will incur an extra RPC to the master. This RPC is required since we want to // get current list of regionservers. Collection<ServerName> serverNames = admin.getRegionServers(true); - invalidateServerMetadataCacheWithRetries(admin, serverNames, tenantId, schemaName, - tableOrViewName, false); + invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false); } } /** - * Invalidate metadata cache on all regionservers with retries for the given tenantID - * and tableName with retries. We retry once before failing the operation. + * Invalidate metadata cache on all regionservers with retries for the given list of + * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains + * tenantID, schema name and table name. + * We retry once before failing the operation. * * @param admin * @param serverNames - * @param tenantId - * @param schemaName - * @param tableOrViewName + * @param invalidateCacheRequests * @param isRetry * @throws Throwable */ private void invalidateServerMetadataCacheWithRetries(Admin admin, - Collection<ServerName> serverNames, byte[] tenantId, byte[] schemaName, - byte[] tableOrViewName, boolean isRetry) throws Throwable { - String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName); - String tenantIDStr = Bytes.toString(tenantId); - LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {} for" - + " region servers: {}, isRetry: {}", tenantIDStr, fullTableName, - serverNames, isRetry); - RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest request = - getRequest(tenantId, schemaName, tableOrViewName); + Collection<ServerName> serverNames, + List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests, + boolean isRetry) throws Throwable { + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest = + getRequest(invalidateCacheRequests); // TODO Do I need my own executor or can I re-use QueryServices#Executor // since it is supposed to be used only for scans according to documentation? List<CompletableFuture<Void>> futures = new ArrayList<>(); @@ -3507,23 +3525,24 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // DDL operations. We also need to think of we need separate RPC handler // threads for this? ServerRpcController controller = new ServerRpcController(); + for (InvalidateServerMetadataCacheRequest invalidateCacheRequest + : invalidateCacheRequests) { + LOGGER.info("Sending invalidate metadata cache for {} to region server:" + + " {}", invalidateCacheRequest.toString(), serverName); + } RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface service = RegionServerEndpointProtos.RegionServerEndpointService .newBlockingStub(admin.coprocessorService(serverName)); - LOGGER.info("Sending invalidate metadata cache for tenantID: {}, tableName: {}" - + " to region server: {}", tenantIDStr, fullTableName, serverName); // The timeout for this particular request is managed by config parameter: // hbase.rpc.timeout. Even if the future times out, this runnable can be in // RUNNING state and will not be interrupted. - service.invalidateServerMetadataCache(controller, request); - LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {}" + service.invalidateServerMetadataCache(controller, protoRequest); + LOGGER.info("Invalidating metadata cache" + " on region server: {} completed successfully and it took {} ms", - tenantIDStr, fullTableName, serverName, - innerWatch.stop().elapsedMillis()); + serverName, innerWatch.stop().elapsedMillis()); // TODO Create a histogram metric for time taken for invalidating the cache. } catch (ServiceException se) { - LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {}" - + " failed for regionserver {}", tenantIDStr, fullTableName, + LOGGER.error("Invalidating metadata cache failed for regionserver {}", serverName, se); IOException ioe = ServerUtil.parseServiceException(se); throw new CompletionException(ioe); @@ -3541,8 +3560,8 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS); } catch (Throwable t) { List<ServerName> failedServers = getFailedServers(futures, map); - LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {} failed for " - + "region servers: {}", tenantIDStr, fullTableName, failedServers, t); + LOGGER.error("Invalidating metadata cache for failed for region servers: {}", + failedServers, t); if (isRetry) { // If this is a retry attempt then just fail the operation. if (allFutures.isCompletedExceptionally()) { @@ -3555,7 +3574,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); // This is the first attempt, we can retry once. // Indicate that this is a retry attempt. invalidateServerMetadataCacheWithRetries(admin, failedServers, - tenantId, schemaName, tableOrViewName, true); + invalidateCacheRequests, true); } } } @@ -3585,16 +3604,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest( - byte[] tenantID, byte[] schemaName, byte[] tableOrViewName) { + List<InvalidateServerMetadataCacheRequest> requests) { RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder = RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder(); - - RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder - = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder(); - innerBuilder.setTenantId(ByteStringer.wrap(tenantID)); - innerBuilder.setSchemaName(ByteStringer.wrap(schemaName)); - innerBuilder.setTableName(ByteStringer.wrap(tableOrViewName)); - builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build()); + for (InvalidateServerMetadataCacheRequest request: requests) { + RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder + = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder(); + innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId())); + innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName())); + innerBuilder.setTableName(ByteStringer.wrap(request.getTableName())); + builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build()); + } return builder.build(); } @@ -4158,7 +4178,10 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); done.run(builder.build()); return; } - invalidateServerMetadataCache(tenantId, schemaName, tableName); + List<InvalidateServerMetadataCacheRequest> requests = new ArrayList<>(); + requests.add(new InvalidateServerMetadataCacheRequest(tenantId, schemaName, + tableName)); + invalidateServerMetadataCache(requests); getCoprocessorHost().preIndexUpdate(Bytes.toString(tenantId), SchemaUtil.getTableName(schemaName, tableName), TableName.valueOf(loadedTable.getPhysicalName().getBytes()), diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java index c5d31a3860..2d6880d97d 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java @@ -25,6 +25,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.thirdparty.com.google.common.collect.Maps; import org.apache.phoenix.util.PhoenixRuntime; import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.TestUtil; @@ -38,6 +39,8 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; import java.util.Map; import java.util.Properties; @@ -420,6 +423,130 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } + /** + * Test that we invalidate the cache for parent table and update the last ddl timestamp + * of the parent table while we add an index. + * Test that we invalidate the cache for parent table and index when we drop an index. + * Also we update the last ddl timestamp for parent table when we drop an index. + * @throws Exception + */ + @Test + public void testUpdateLastDDLTimestampTableAfterIndexCreation() throws Exception { + String tableName = generateUniqueName(); + byte[] tableNameBytes = Bytes.toBytes(tableName); + String indexName = generateUniqueName(); + byte[] indexNameBytes = Bytes.toBytes(indexName); + ServerMetadataCache cache = ServerMetadataCache.getInstance(config); + String ddl = + "create table " + tableName + " ( k integer PRIMARY KEY," + " v1 integer," + + " v2 integer)"; + String createIndexDDL = "create index " + indexName + " on " + tableName + " (v1)"; + String dropIndexDDL = "DROP INDEX " + indexName + " ON " + tableName; + try (Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + conn.setAutoCommit(true); + stmt.execute(ddl); + long tableLastDDLTimestampBeforeIndexCreation = getLastDDLTimestamp(tableName); + // Populate the cache + assertNotNull(cache.getLastDDLTimestampForTable(null, null, tableNameBytes)); + Thread.sleep(1); + stmt.execute(createIndexDDL); + // Make sure that we have invalidated the last ddl timestamp for parent table + // on all regionservers after we create an index. + assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, tableNameBytes)); + long tableLastDDLTimestampAfterIndexCreation = getLastDDLTimestamp(tableName); + assertNotNull(tableLastDDLTimestampAfterIndexCreation); + assertTrue(tableLastDDLTimestampAfterIndexCreation > + tableLastDDLTimestampBeforeIndexCreation); + long indexLastDDLTimestampAfterCreation = getLastDDLTimestamp(indexName); + // Make sure that last ddl timestamp is cached on the regionserver. + assertNotNull(indexLastDDLTimestampAfterCreation); + // Adding a sleep for 1 ms so that we get new last ddl timestamp. + Thread.sleep(1); + stmt.execute(dropIndexDDL); + // Make sure that we invalidate the cache on regionserver for base table and an index + // after we dropped an index. + assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, tableNameBytes)); + assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, indexNameBytes)); + long tableLastDDLTimestampAfterIndexDeletion = getLastDDLTimestamp(tableName); + // Verify that last ddl timestamp after index deletion is greater than + // the previous last ddl timestamp. + assertNotNull(tableLastDDLTimestampAfterIndexDeletion); + assertTrue(tableLastDDLTimestampAfterIndexDeletion > + tableLastDDLTimestampAfterIndexCreation); + } + } + + /** + * Test that we invalidate the cache of the immediate parent view + * and update the last ddl timestamp of the immediate parent view while we add an index. + * Test that we invalidate the cache for parent view and view index when we drop an index. + * Also we update the last ddl timestamp for parent view when we drop an index. + * @throws Exception + */ + @Test + public void testUpdateLastDDLTimestampViewAfterIndexCreation() throws Exception { + String tableName = "T_" + generateUniqueName(); + String globalViewName = "GV_" + generateUniqueName(); + byte[] globalViewNameBytes = Bytes.toBytes(globalViewName); + String globalViewIndexName = "GV_IDX_" + generateUniqueName(); + byte[] globalViewIndexNameBytes = Bytes.toBytes(globalViewIndexName); + + ServerMetadataCache cache = ServerMetadataCache.getInstance(config); + try(Connection conn = DriverManager.getConnection(getUrl()); + Statement stmt = conn.createStatement()) { + String whereClause = " WHERE COL1 < 1000"; + String tableDDLStmt = getCreateTableStmt(tableName); + String viewDDLStmt = getCreateViewStmt(globalViewName, tableName, whereClause); + String viewIdxDDLStmt = getCreateViewIndexStmt(globalViewIndexName, globalViewName, + "COL1"); + String dropIndexDDL = "DROP INDEX " + globalViewIndexName + " ON " + globalViewName; + stmt.execute(tableDDLStmt); + stmt.execute(viewDDLStmt); + // Populate the cache + assertNotNull(cache.getLastDDLTimestampForTable(null, null, globalViewNameBytes)); + long viewLastDDLTimestampBeforeIndexCreation = getLastDDLTimestamp(globalViewName); + stmt.execute(viewIdxDDLStmt); + + // Make sure that we have invalidated the last ddl timestamp for parent global view + // on all regionserver after we create a view index. + assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, + globalViewNameBytes)); + long viewLastDDLTimestampAfterIndexCreation = getLastDDLTimestamp(globalViewName); + assertTrue(viewLastDDLTimestampAfterIndexCreation > + viewLastDDLTimestampBeforeIndexCreation); + long indexLastDDLTimestampAfterCreation = getLastDDLTimestamp(globalViewIndexName); + // Make sure that last ddl timestamp is cached on the regionserver. + assertNotNull(indexLastDDLTimestampAfterCreation); + // Adding a sleep for 1 ms so that we get new last ddl timestamp. + Thread.sleep(1); + stmt.execute(dropIndexDDL); + // Make sure that we invalidate the cache on regionservers for view and its index after + // we drop a view index. + assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, + globalViewNameBytes)); + assertNull(cache.getLastDDLTimestampForTableFromCacheOnly(null, null, + globalViewIndexNameBytes)); + long viewLastDDLTimestampAfterIndexDeletion = getLastDDLTimestamp(globalViewName); + // Verify that last ddl timestamp of view after index deletion is greater than + // the previous last ddl timestamp. + assertNotNull(viewLastDDLTimestampAfterIndexDeletion); + assertTrue(viewLastDDLTimestampAfterIndexDeletion > + viewLastDDLTimestampAfterIndexCreation); + } + } + + public long getLastDDLTimestamp(String tableName) throws SQLException { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + // Need to use different connection than what is used for creating table or indexes. + String url = QueryUtil.getConnectionUrl(props, config, "client1"); + try (Connection conn = DriverManager.getConnection(url)) { + PTable table = PhoenixRuntime.getTableNoCache(conn, tableName); + return table.getLastDDLTimestamp(); + } + } + + private String getCreateTableStmt(String tableName) { return "CREATE TABLE " + tableName + " (a_string varchar not null, col1 integer" + @@ -431,4 +558,10 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { " AS SELECT * FROM "+ fullTableName + whereClause; return viewStmt; } + + private String getCreateViewIndexStmt(String indexName, String viewName, String indexColumn) { + String viewIndexName = + "CREATE INDEX " + indexName + " ON " + viewName + "(" + indexColumn + ")"; + return viewIndexName; + } }