[ https://issues.apache.org/jira/browse/PHOENIX-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17801070#comment-17801070 ]
ASF GitHub Bot commented on PHOENIX-6968: ----------------------------------------- shahrs87 commented on code in PR #1691: URL: https://github.com/apache/phoenix/pull/1691#discussion_r1437921306 ########## phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java: ########## @@ -3416,6 +3437,167 @@ private MetaDataMutationResult mutateColumn( } } + /** + * Invalidate metadata cache from all region servers for the given tenant and table name. + * @param tenantId + * @param schemaName + * @param tableOrViewName + * @throws Throwable + */ + private void invalidateServerMetadataCache(byte[] tenantId, byte[]schemaName, + byte[] tableOrViewName) throws Throwable { + Configuration conf = env.getConfiguration(); + String value = conf.get(REGIONSERVER_COPROCESSOR_CONF_KEY); + if (value == null + || !value.contains(PhoenixRegionServerEndpoint.class.getName())) { + // PhoenixRegionServerEndpoint is not loaded. We don't have to invalidate the cache. + LOGGER.info("Skip invalidating server metadata cache for tenantID: {}," + + " schema name: {}, table Name: {} since PhoenixRegionServerEndpoint" + + " is not loaded", Bytes.toString(tenantId), + Bytes.toString(schemaName), Bytes.toString(tableOrViewName)); + return; + } + Properties properties = new Properties(); + // Skip checking of system table existence since the system tables should have created + // by now. + properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true"); + try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(properties, + env.getConfiguration()).unwrap(PhoenixConnection.class); + Admin admin = connection.getQueryServices().getAdmin()) { + // This will incur an extra RPC to the master. This RPC is required since we want to + // get current list of regionservers. + Collection<ServerName> serverNames = admin.getRegionServers(true); + invalidateServerMetadataCacheWithRetries(admin, serverNames, tenantId, schemaName, + tableOrViewName, false); + } + } + + /** + * Invalidate metadata cache on all regionservers with retries for the given tenantID + * and tableName with retries. We retry once before failing the operation. + * + * @param admin + * @param serverNames + * @param tenantId + * @param schemaName + * @param tableOrViewName + * @param isRetry + * @throws Throwable + */ + private void invalidateServerMetadataCacheWithRetries(Admin admin, + Collection<ServerName> serverNames, byte[] tenantId, byte[] schemaName, + byte[] tableOrViewName, boolean isRetry) throws Throwable { + String fullTableName = SchemaUtil.getTableName(schemaName, tableOrViewName); + String tenantIDStr = Bytes.toString(tenantId); + LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {} for" + + " region servers: {}, isRetry: {}", tenantIDStr, fullTableName, + serverNames, isRetry); + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest request = + getRequest(tenantId, schemaName, tableOrViewName); + // TODO Do I need my own executor or can I re-use QueryServices#Executor + // since it is supposed to be used only for scans according to documentation? + List<CompletableFuture<Void>> futures = new ArrayList<>(); + Map<Future, ServerName> map = new HashMap<>(); + for (ServerName serverName : serverNames) { + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + try { + PhoenixStopWatch innerWatch = new PhoenixStopWatch().start(); + // TODO Using the same as ServerCacheClient but need to think if we need some + // special controller for invalidating cache since this is in the path of + // DDL operations. We also need to think of we need separate RPC handler + // threads for this? + ServerRpcController controller = new ServerRpcController(); + RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface + service = RegionServerEndpointProtos.RegionServerEndpointService + .newBlockingStub(admin.coprocessorService(serverName)); + LOGGER.info("Sending invalidate metadata cache for tenantID: {}, tableName: {}" + + " to region server: {}", tenantIDStr, fullTableName, serverName); + // The timeout for this particular request is managed by config parameter: + // hbase.rpc.timeout. Even if the future times out, this runnable can be in + // RUNNING state and will not be interrupted. + service.invalidateServerMetadataCache(controller, request); + LOGGER.info("Invalidating metadata cache for tenantID: {}, tableName: {}" + + " on region server: {} completed successfully and it took {} ms", + tenantIDStr, fullTableName, serverName, + innerWatch.stop().elapsedMillis()); + // TODO Create a histogram metric for time taken for invalidating the cache. + } catch (ServiceException se) { + LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {}" + + " failed for regionserver {}", tenantIDStr, fullTableName, + serverName, se); + IOException ioe = ServerUtil.parseServiceException(se); + throw new CompletionException(ioe); + } + }); + futures.add(future); + map.put(future, serverName); + } + + // Here we create one master like future which tracks individual future + // for each region server. + CompletableFuture<Void> allFutures = CompletableFuture.allOf( + futures.toArray(new CompletableFuture[0])); + try { + allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + List<ServerName> failedServers = getFailedServers(futures, map); + LOGGER.error("Invalidating metadata cache for tenantID: {}, tableName: {} failed for " + + "region servers: {}", tenantIDStr, fullTableName, failedServers, t); + if (isRetry) { + // If this is a retry attempt then just fail the operation. + if (allFutures.isCompletedExceptionally()) { + if (t instanceof ExecutionException) { + t = t.getCause(); + } + } + throw t; + } else { + // This is the first attempt, we can retry once. + // Indicate that this is a retry attempt. + invalidateServerMetadataCacheWithRetries(admin, failedServers, + tenantId, schemaName, tableOrViewName, true); + } + } + } + + /* + Get the list of regionservers that failed the invalidateCache rpc. + */ + private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures, + Map<Future, ServerName> map) { + List<ServerName> failedServers = new ArrayList<>(); + for (CompletableFuture completedFuture : futures) { + if (completedFuture.isDone() == false) { Review Comment: > Will it have a false value? Yes, it will have false value. We also have [test case](https://github.com/apache/phoenix/blob/PHOENIX-6883-feature/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java#L67) for this. > In the timeout case, won't the remote RPC thread be in a stuck state? What is the chance of a retry making progress, besides initiating a concurrent request? In a later jira [PHOENIX-7115](https://issues.apache.org/jira/browse/PHOENIX-7115), we have added special handler threads on each regionserver just to handle invalidate cache RPCs. Earlier invalidate cache RPCs were handled by default handler threads. So the probability of these RPC threads being stuck is very small but if it happens we can increase the number of threads if necessary. > Create PhoenixRegionServerEndpoint#invalidateCache method to invalidate cache. > ------------------------------------------------------------------------------ > > Key: PHOENIX-6968 > URL: https://issues.apache.org/jira/browse/PHOENIX-6968 > Project: Phoenix > Issue Type: Sub-task > Reporter: Rushabh Shah > Assignee: Rushabh Shah > Priority: Major > > Whenever we update metadata (like alter table add column, drop table), we > need to invalidate metadata cache entry (introduced by PHOENIX-6943) on all > the regionservers which has that cache entry. First step would be to issue an > invalidate command on all the regionservers irrespective of whether that > regionserver has the cache entry. We can further optimize by invalidating > only on RS that has that cache entry. > In PHOENIX-6988 we created PhoenixRegionServerEndpoint implementing > RegionServerCoprocessor. We can create a new method in this co-proc something > like invalidateCache(CacheEntry) to invalidate cache for a given > table/view/index. -- This message was sent by Atlassian Jira (v8.20.10#820010)