This is an automated email from the ASF dual-hosted git repository. shahrs87 pushed a commit to branch PHOENIX-6883-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-6883-feature by this push: new 83134d9731 PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748) 83134d9731 is described below commit 83134d9731ab12072d23573d2c38252489d306bb Author: Rushabh Shah <shahr...@apache.org> AuthorDate: Fri Dec 22 08:44:40 2023 -0800 PHOENIX-7115 Create separate handler thread pool for invalidating server metadata cache (#1748) --- .../FailingPhoenixRegionServerEndpoint.java | 4 +- .../phoenix/end2end/InvalidateMetadataCacheIT.java | 4 +- .../hadoop/hbase/ipc/PhoenixRpcScheduler.java | 37 +++- .../hbase/ipc/PhoenixRpcSchedulerFactory.java | 14 +- ...java => InvalidateMetadataCacheController.java} | 36 ++-- ... InvalidateMetadataCacheControllerFactory.java} | 26 +-- .../controller/ServerSideRPCControllerFactory.java | 3 - .../phoenix/coprocessor/MetaDataEndpointImpl.java | 195 +---------------- .../phoenix/query/ConnectionQueryServices.java | 4 + .../phoenix/query/ConnectionQueryServicesImpl.java | 236 +++++++++++++++++++-- .../query/ConnectionlessQueryServicesImpl.java | 7 + .../query/DelegateConnectionQueryServices.java | 7 + .../org/apache/phoenix/query/QueryServices.java | 10 +- .../apache/phoenix/query/QueryServicesOptions.java | 2 + ...dulerTest.java => PhoenixRpcSchedulerTest.java} | 61 ++++-- .../phoenix/cache/ServerMetadataCacheTest.java | 26 ++- 16 files changed, 405 insertions(+), 267 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java index 7e40cfe76f..5f33610c29 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/FailingPhoenixRegionServerEndpoint.java @@ -27,8 +27,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS; -import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT; +import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT; public class FailingPhoenixRegionServerEndpoint extends PhoenixRegionServerEndpoint { private static final Logger LOGGER = LoggerFactory.getLogger(FailingPhoenixRegionServerEndpoint.class); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java index c0727b9d68..b8d1d55732 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/InvalidateMetadataCacheIT.java @@ -34,8 +34,8 @@ import java.util.Map; import java.util.Properties; import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY; -import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS; -import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; +import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS; +import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.fail; diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java index ea6a5c9719..0a04ad4787 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcScheduler.java @@ -27,12 +27,13 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT; + /** * {@link RpcScheduler} that first checks to see if this is an index or metadata update before passing off the * call to the delegate {@link RpcScheduler}. */ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { - // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 private static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "ipc.server.callqueue.handler.factor"; private static final String CALLQUEUE_LENGTH_CONF_KEY = "ipc.server.max.callqueue.length"; @@ -41,28 +42,44 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { private int indexPriority; private int metadataPriority; private int serverSidePriority; + private int invalidateMetadataCachePriority; private RpcExecutor indexCallExecutor; private RpcExecutor metadataCallExecutor; private RpcExecutor serverSideCallExecutor; + // Executor for invalidating server side metadata cache RPCs. + private RpcExecutor invalidateMetadataCacheCallExecutor; private int port; - public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, int metadataPriority, int serversidePriority, PriorityFunction priorityFunction, Abortable abortable) { + public PhoenixRpcScheduler(Configuration conf, RpcScheduler delegate, int indexPriority, + int metadataPriority, int serversidePriority, + int invalidateMetadataCachePriority, + PriorityFunction priorityFunction, Abortable abortable) { // copied from org.apache.hadoop.hbase.ipc.SimpleRpcScheduler in HBase 0.98.4 int indexHandlerCount = conf.getInt(QueryServices.INDEX_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_HANDLER_COUNT); int metadataHandlerCount = conf.getInt(QueryServices.METADATA_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_METADATA_HANDLER_COUNT); int serverSideHandlerCount = conf.getInt(QueryServices.SERVER_SIDE_HANDLER_COUNT_ATTRIB, QueryServicesOptions.DEFAULT_SERVERSIDE_HANDLER_COUNT); + int invalidateMetadataCacheHandlerCount = conf.getInt( + QueryServices.INVALIDATE_CACHE_HANDLER_COUNT_ATTRIB, + DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT); int maxIndexQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, indexHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxMetadataQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, metadataHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxServerSideQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, serverSideHandlerCount*DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxInvalidateMetadataCacheQueueLength = conf.getInt(CALLQUEUE_LENGTH_CONF_KEY, + invalidateMetadataCacheHandlerCount * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + this.indexPriority = indexPriority; this.metadataPriority = metadataPriority; this.serverSidePriority = serversidePriority; + this.invalidateMetadataCachePriority = invalidateMetadataCachePriority; this.delegate = delegate; this.indexCallExecutor = new BalancedQueueRpcExecutor("Index", indexHandlerCount, maxIndexQueueLength, priorityFunction,conf,abortable); this.metadataCallExecutor = new BalancedQueueRpcExecutor("Metadata", metadataHandlerCount, maxMetadataQueueLength, priorityFunction,conf,abortable); this.serverSideCallExecutor = new BalancedQueueRpcExecutor("ServerSide", serverSideHandlerCount, maxServerSideQueueLength, priorityFunction,conf,abortable); + this.invalidateMetadataCacheCallExecutor = new BalancedQueueRpcExecutor( + "InvalidateMetadataCache", invalidateMetadataCacheHandlerCount, + maxInvalidateMetadataCacheQueueLength, priorityFunction, conf, abortable); } @Override @@ -77,6 +94,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { indexCallExecutor.start(port); metadataCallExecutor.start(port); serverSideCallExecutor.start(port); + invalidateMetadataCacheCallExecutor.start(port); } @Override @@ -85,6 +103,7 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { indexCallExecutor.stop(); metadataCallExecutor.stop(); serverSideCallExecutor.stop(); + invalidateMetadataCacheCallExecutor.stop(); } @Override @@ -97,6 +116,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { return metadataCallExecutor.dispatch(callTask); } else if (serverSidePriority == priority) { return serverSideCallExecutor.dispatch(callTask); + } else if (invalidateMetadataCachePriority == priority) { + return invalidateMetadataCacheCallExecutor.dispatch(callTask); } else { return delegate.dispatch(callTask); } @@ -114,7 +135,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { return this.delegate.getGeneralQueueLength() + this.indexCallExecutor.getQueueLength() + this.metadataCallExecutor.getQueueLength() - + this.serverSideCallExecutor.getQueueLength(); + + this.serverSideCallExecutor.getQueueLength() + + this.invalidateMetadataCacheCallExecutor.getQueueLength(); } @Override @@ -132,7 +154,8 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { return this.delegate.getActiveRpcHandlerCount() + this.indexCallExecutor.getActiveHandlerCount() + this.metadataCallExecutor.getActiveHandlerCount() - + this.serverSideCallExecutor.getActiveHandlerCount(); + + this.serverSideCallExecutor.getActiveHandlerCount() + + this.invalidateMetadataCacheCallExecutor.getActiveHandlerCount(); } @Override @@ -155,6 +178,11 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { this.metadataCallExecutor = executor; } + @VisibleForTesting + public void setInvalidateMetadataCacheExecutorForTesting(RpcExecutor executor) { + this.invalidateMetadataCacheCallExecutor = executor; + } + @VisibleForTesting public void setServerSideExecutorForTesting(RpcExecutor executor) { this.serverSideCallExecutor = executor; @@ -229,5 +257,4 @@ public class PhoenixRpcScheduler extends CompatPhoenixRpcScheduler { public int getActiveReplicationRpcHandlerCount() { return this.delegate.getActiveReplicationRpcHandlerCount(); } - } diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java index 74adf01573..d57828c260 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerFactory.java @@ -63,12 +63,16 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory { validatePriority(serverSidePriority); // validate index and metadata priorities are not the same - Preconditions.checkArgument(indexPriority != metadataPriority, "Index and Metadata priority must not be same "+ indexPriority); + Preconditions.checkArgument(indexPriority != metadataPriority, + "Index and Metadata priority must not be same " + indexPriority); LOGGER.info("Using custom Phoenix Index RPC Handling with index rpc priority " + indexPriority + " and metadata rpc priority " + metadataPriority); - PhoenixRpcScheduler scheduler = - new PhoenixRpcScheduler(conf, delegate, indexPriority, metadataPriority, serverSidePriority, priorityFunction,abortable); + int invalidateCachePriority = getInvalidateMetadataCachePriority(conf); + validatePriority(invalidateCachePriority); + PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, delegate, indexPriority, + metadataPriority, serverSidePriority, invalidateCachePriority, priorityFunction, + abortable); return scheduler; } @@ -97,4 +101,8 @@ public class PhoenixRpcSchedulerFactory implements RpcSchedulerFactory { return conf.getInt(QueryServices.SERVER_SIDE_PRIOIRTY_ATTRIB, QueryServicesOptions.DEFAULT_SERVER_SIDE_PRIORITY); } + public static int getInvalidateMetadataCachePriority(Configuration conf) { + return conf.getInt(QueryServices.INVALIDATE_METADATA_CACHE_PRIORITY_ATTRIB, + QueryServicesOptions.DEFAULT_INVALIDATE_METADATA_CACHE_PRIORITY); + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java similarity index 51% copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java index ba7fb6339d..3c9eda613d 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheController.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,24 +18,34 @@ package org.apache.hadoop.hbase.ipc.controller; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; /** - * {@link RpcControllerFactory} that should only be used when - * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables. + * Controller used to invalidate server side metadata cache RPCs. */ -public class ServerSideRPCControllerFactory { +public class InvalidateMetadataCacheController extends DelegatingHBaseRpcController { + private int priority; + + public InvalidateMetadataCacheController(HBaseRpcController delegate, Configuration conf) { + super(delegate); + this.priority = PhoenixRpcSchedulerFactory.getInvalidateMetadataCachePriority(conf); + } - private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class); - protected final Configuration conf; + @Override + public void setPriority(int priority) { + this.priority = priority; + } - public ServerSideRPCControllerFactory(Configuration conf) { - this.conf = conf; + @Override + public void setPriority(TableName tn) { + // Nothing } - public ServerToServerRpcController newController() { - return new ServerToServerRpcControllerImpl(this.conf); + @Override + public int getPriority() { + return this.priority; } } diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java similarity index 61% copy from phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java copy to phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java index ba7fb6339d..ee6b3b24ff 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/InvalidateMetadataCacheControllerFactory.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,24 +18,24 @@ package org.apache.hadoop.hbase.ipc.controller; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * {@link RpcControllerFactory} that should only be used when - * making server-server remote RPCs to the region servers hosting Phoenix SYSTEM tables. + * Factory to instantiate InvalidateMetadataCacheControllers */ -public class ServerSideRPCControllerFactory { - - private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class); - protected final Configuration conf; +public class InvalidateMetadataCacheControllerFactory extends RpcControllerFactory { + public InvalidateMetadataCacheControllerFactory(Configuration conf) { + super(conf); + } - public ServerSideRPCControllerFactory(Configuration conf) { - this.conf = conf; + @Override + public HBaseRpcController newController() { + HBaseRpcController delegate = super.newController(); + return getController(delegate); } - public ServerToServerRpcController newController() { - return new ServerToServerRpcControllerImpl(this.conf); + private HBaseRpcController getController(HBaseRpcController delegate) { + return new InvalidateMetadataCacheController(delegate, conf); } } diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java index ba7fb6339d..a1a97cf6ce 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/ipc/controller/ServerSideRPCControllerFactory.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.ipc.controller; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * {@link RpcControllerFactory} that should only be used when @@ -28,7 +26,6 @@ import org.slf4j.LoggerFactory; */ public class ServerSideRPCControllerFactory { - private static final Logger LOG = LoggerFactory.getLogger(ServerSideRPCControllerFactory.class); protected final Configuration conf; public ServerSideRPCControllerFactory(Configuration conf) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index c9aa4acb73..28d561e465 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -101,23 +101,15 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.NavigableMap; import java.util.Optional; import java.util.Properties; import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; @@ -133,11 +125,9 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.TagUtil; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Mutation; @@ -154,12 +144,10 @@ import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcUtil; -import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region.RowLock; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.VersionInfo; @@ -187,9 +175,6 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRequest import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; -import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; -import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource; -import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.expression.Expression; @@ -268,7 +253,6 @@ import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixKeyValueUtil; import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.PhoenixStopWatch; import org.apache.phoenix.util.QueryUtil; import org.apache.phoenix.util.ReadOnlyProps; import org.apache.phoenix.util.SchemaUtil; @@ -285,7 +269,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; /** * Endpoint co-processor through which all Phoenix metadata mutations flow. @@ -328,12 +311,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr // Column to track tables that have been upgraded based on PHOENIX-2067 public static final String ROW_KEY_ORDER_OPTIMIZABLE = "ROW_KEY_ORDER_OPTIMIZABLE"; public static final byte[] ROW_KEY_ORDER_OPTIMIZABLE_BYTES = Bytes.toBytes(ROW_KEY_ORDER_OPTIMIZABLE); - public static final String PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS = - "phoenix.metadata.cache.invalidation.timeoutMs"; - // Default to 10 seconds. - public static final long PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT = 10 * 1000; - public static final String PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED = - "phoenix.metadata.invalidate.cache.enabled"; + // KeyValues for Table private static final Cell TABLE_TYPE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, TABLE_TYPE_BYTES); @@ -612,8 +590,7 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); private boolean allowSplittableSystemCatalogRollback; private MetricsMetadataSource metricsSource; - private MetricsMetadataCachingSource metricsMetadataCachingSource; - private long metadataCacheInvalidationTimeoutMs; + public static void setFailConcurrentMutateAddColumnOneTimeForTesting(boolean fail) { failConcurrentMutateAddColumnOneTimeForTesting = fail; } @@ -649,16 +626,11 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); new ReadOnlyProps(config.iterator())); this.allowSplittableSystemCatalogRollback = config.getBoolean(QueryServices.ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK, QueryServicesOptions.DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK); - this.metadataCacheInvalidationTimeoutMs = config.getLong( - PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS, - PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT); LOGGER.info("Starting Tracing-Metrics Systems"); // Start the phoenix trace collection Tracing.addTraceMetricsSource(); Metrics.ensureConfigured(); metricsSource = MetricsMetadataSourceFactory.getMetadataMetricsSource(); - metricsMetadataCachingSource - = MetricsPhoenixCoprocessorSourceFactory.getInstance().getMetadataCachingSource(); } @Override @@ -3483,170 +3455,17 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); } } - /** - * Invalidate metadata cache from all region servers for the given list of - * InvalidateServerMetadataCacheRequest. - * @throws Throwable - */ private void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests) throws Throwable { - Configuration conf = env.getConfiguration(); - boolean invalidateCacheEnabled = conf.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, - false); - if (!invalidateCacheEnabled) { - LOGGER.info("Skip invalidating server metadata cache since conf property" - + " phoenix.metadata.invalidate.cache.enabled is set to false"); - return; - } - metricsMetadataCachingSource.incrementMetadataCacheInvalidationOperationsCount(); Properties properties = new Properties(); // Skip checking of system table existence since the system tables should have created // by now. properties.setProperty(SKIP_SYSTEM_TABLES_EXISTENCE_CHECK, "true"); try (PhoenixConnection connection = QueryUtil.getConnectionOnServer(properties, - env.getConfiguration()).unwrap(PhoenixConnection.class); - Admin admin = connection.getQueryServices().getAdmin()) { - // This will incur an extra RPC to the master. This RPC is required since we want to - // get current list of regionservers. - Collection<ServerName> serverNames = admin.getRegionServers(true); - PhoenixStopWatch stopWatch = new PhoenixStopWatch().start(); - try { - invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false); - metricsMetadataCachingSource.incrementMetadataCacheInvalidationSuccessCount(); - } catch (Throwable t) { - metricsMetadataCachingSource.incrementMetadataCacheInvalidationFailureCount(); - throw t; - } finally { - metricsMetadataCachingSource - .addMetadataCacheInvalidationTotalTime(stopWatch.stop().elapsedMillis()); - } - } - } - - /** - * Invalidate metadata cache on all regionservers with retries for the given list of - * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains - * tenantID, schema name and table name. - * We retry once before failing the operation. - * - * @param admin - * @param serverNames - * @param invalidateCacheRequests - * @param isRetry - * @throws Throwable - */ - private void invalidateServerMetadataCacheWithRetries(Admin admin, - Collection<ServerName> serverNames, - List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests, - boolean isRetry) throws Throwable { - RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest = - getRequest(invalidateCacheRequests); - // TODO Do I need my own executor or can I re-use QueryServices#Executor - // since it is supposed to be used only for scans according to documentation? - List<CompletableFuture<Void>> futures = new ArrayList<>(); - Map<Future, ServerName> map = new HashMap<>(); - for (ServerName serverName : serverNames) { - CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { - try { - PhoenixStopWatch innerWatch = new PhoenixStopWatch().start(); - // TODO Using the same as ServerCacheClient but need to think if we need some - // special controller for invalidating cache since this is in the path of - // DDL operations. We also need to think of we need separate RPC handler - // threads for this? - ServerRpcController controller = new ServerRpcController(); - for (InvalidateServerMetadataCacheRequest invalidateCacheRequest - : invalidateCacheRequests) { - LOGGER.info("Sending invalidate metadata cache for {} to region server:" - + " {}", invalidateCacheRequest.toString(), serverName); - } - RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface - service = RegionServerEndpointProtos.RegionServerEndpointService - .newBlockingStub(admin.coprocessorService(serverName)); - // The timeout for this particular request is managed by config parameter: - // hbase.rpc.timeout. Even if the future times out, this runnable can be in - // RUNNING state and will not be interrupted. - service.invalidateServerMetadataCache(controller, protoRequest); - long cacheInvalidationTime = innerWatch.stop().elapsedMillis(); - LOGGER.info("Invalidating metadata cache" - + " on region server: {} completed successfully and it took {} ms", - serverName, cacheInvalidationTime); - metricsMetadataCachingSource - .addMetadataCacheInvalidationRpcTime(cacheInvalidationTime); - } catch (ServiceException se) { - LOGGER.error("Invalidating metadata cache failed for regionserver {}", - serverName, se); - IOException ioe = ServerUtil.parseServiceException(se); - throw new CompletionException(ioe); - } - }); - futures.add(future); - map.put(future, serverName); - } - - // Here we create one master like future which tracks individual future - // for each region server. - CompletableFuture<Void> allFutures = CompletableFuture.allOf( - futures.toArray(new CompletableFuture[0])); - try { - allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS); - } catch (Throwable t) { - List<ServerName> failedServers = getFailedServers(futures, map); - LOGGER.error("Invalidating metadata cache for failed for region servers: {}", - failedServers, t); - if (isRetry) { - // If this is a retry attempt then just fail the operation. - if (allFutures.isCompletedExceptionally()) { - if (t instanceof ExecutionException) { - t = t.getCause(); - } - } - throw t; - } else { - // This is the first attempt, we can retry once. - // Indicate that this is a retry attempt. - invalidateServerMetadataCacheWithRetries(admin, failedServers, - invalidateCacheRequests, true); - } - } - } - - /* - Get the list of regionservers that failed the invalidateCache rpc. - */ - private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures, - Map<Future, ServerName> map) { - List<ServerName> failedServers = new ArrayList<>(); - for (CompletableFuture completedFuture : futures) { - if (completedFuture.isDone() == false) { - // If this task is still running, cancel it and keep in retry list. - ServerName sn = map.get(completedFuture); - failedServers.add(sn); - // Even though we cancel this future but it doesn't interrupt the executing thread. - completedFuture.cancel(true); - } else if (completedFuture.isCompletedExceptionally() - || completedFuture.isCancelled()) { - // This means task is done but completed with exception - // or was canceled. Add it to retry list. - ServerName sn = map.get(completedFuture); - failedServers.add(sn); - } - } - return failedServers; - } - - private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest( - List<InvalidateServerMetadataCacheRequest> requests) { - RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder = - RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder(); - for (InvalidateServerMetadataCacheRequest request: requests) { - RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder - = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder(); - innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId())); - innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName())); - innerBuilder.setTableName(ByteStringer.wrap(request.getTableName())); - builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build()); + env.getConfiguration()).unwrap(PhoenixConnection.class)) { + ConnectionQueryServices queryServices = connection.getQueryServices(); + queryServices.invalidateServerMetadataCache(requests); } - return builder.build(); } /** @@ -3830,7 +3649,9 @@ TABLE_FAMILY_BYTES, TABLE_SEQ_NUM_BYTES); buildTable(key, cacheKey, region, clientTimeStamp, clientVersion); return table; } finally { - if (!wasLocked && rowLock != null) rowLock.release(); + if (!wasLocked && rowLock != null) { + rowLock.release(); + } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java index e527023bc2..45315f89a3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServices.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.MutationState; @@ -230,4 +231,7 @@ public interface ConnectionQueryServices extends QueryServices, MetaDataMutated } int getConnectionCount(boolean isInternal); + + void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests) + throws Throwable; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 3a83a878be..94f9d557fd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.KEEP_ import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.MAX_VERSIONS; import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE; import static org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder.TTL; +import static org.apache.hadoop.hbase.ipc.RpcControllerFactory.CUSTOM_CONTROLLER_CONF_KEY; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0; @@ -109,11 +110,14 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; @@ -127,6 +131,7 @@ import java.util.regex.Pattern; import javax.annotation.concurrent.GuardedBy; import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; @@ -159,7 +164,9 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.PhoenixRpcSchedulerFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.ServerRpcController; +import org.apache.hadoop.hbase.ipc.controller.InvalidateMetadataCacheControllerFactory; import org.apache.hadoop.hbase.ipc.controller.ServerToServerRpcController; import org.apache.hadoop.hbase.ipc.controller.ServerSideRPCControllerFactory; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; @@ -176,6 +183,7 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.phoenix.compile.MutationPlan; import org.apache.phoenix.coprocessor.ChildLinkMetaDataEndpoint; import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver; +import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest; import org.apache.phoenix.coprocessor.MetaDataEndpointImpl; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; @@ -211,6 +219,9 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.GetVersionRespons import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; +import org.apache.phoenix.coprocessor.generated.RegionServerEndpointProtos; +import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource; +import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory; import org.apache.phoenix.exception.InvalidRegionSplitPolicyException; import org.apache.phoenix.exception.PhoenixIOException; import org.apache.phoenix.exception.RetriableUpgradeException; @@ -409,6 +420,12 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement // writes guarded by "liveRegionServersLock" private volatile List<ServerName> liveRegionServers; private final Object liveRegionServersLock = new Object(); + // Writes guarded by invalidateMetadataCacheConnLock + private Connection invalidateMetadataCacheConnection = null; + private final Object invalidateMetadataCacheConnLock = new Object(); + private MetricsMetadataCachingSource metricsMetadataCachingSource; + public static final String INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE = + "Cannot invalidate server metadata cache on a non-server connection"; private static interface FeatureSupported { boolean isSupported(ConnectionQueryServices services); @@ -541,35 +558,61 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement e.printStackTrace(); } } - + nSequenceSaltBuckets = config.getInt(QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, + QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); + this.metricsMetadataCachingSource = MetricsPhoenixCoprocessorSourceFactory.getInstance() + .getMetadataCachingSource(); } - private void openConnection() throws SQLException { + private Connection openConnection(Configuration conf) throws SQLException { + Connection localConnection; try { - this.connection = HBaseFactoryProvider.getHConnectionFactory().createConnection(this.config); + localConnection = HBaseFactoryProvider.getHConnectionFactory().createConnection(conf); GLOBAL_HCONNECTIONS_COUNTER.increment(); LOGGER.info("HConnection established. Stacktrace for informational purposes: " - + connection + " " + LogUtil.getCallerStackTrace()); + + localConnection + " " + LogUtil.getCallerStackTrace()); } catch (IOException e) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION) .setRootCause(e).build().buildException(); } - if (this.connection.isClosed()) { // TODO: why the heck doesn't this throw above? + if (localConnection.isClosed()) { // TODO: why the heck doesn't this throw above? throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_ESTABLISH_CONNECTION).build().buildException(); } + return localConnection; + } + + /** + * We create a long-lived hbase connection to run invalidate cache RPCs. We override + * CUSTOM_CONTROLLER_CONF_KEY to instantiate InvalidateMetadataCacheController which has + * a special priority for invalidate metadata cache operations. + * @return hbase connection + * @throws SQLException SQLException + */ + public Connection getInvalidateMetadataCacheConnection() throws SQLException { + if (invalidateMetadataCacheConnection != null) { + return invalidateMetadataCacheConnection; + } + + synchronized (invalidateMetadataCacheConnLock) { + Configuration clonedConfiguration = PropertiesUtil.cloneConfig(this.config); + clonedConfiguration.setClass(CUSTOM_CONTROLLER_CONF_KEY, + InvalidateMetadataCacheControllerFactory.class, RpcControllerFactory.class); + invalidateMetadataCacheConnection = openConnection(clonedConfiguration); + } + return invalidateMetadataCacheConnection; } /** * Close the HBase connection and decrement the counter. * @throws IOException throws IOException */ - private void closeConnection() throws IOException { + private void closeConnection(Connection connection) throws IOException { if (connection != null) { connection.close(); LOGGER.info("{} HConnection closed. Stacktrace for informational" + " purposes: {}", connection, LogUtil.getCallerStackTrace()); + GLOBAL_HCONNECTIONS_COUNTER.decrement(); } - GLOBAL_HCONNECTIONS_COUNTER.decrement(); } @Override @@ -678,8 +721,9 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement latestMetaDataLock.notifyAll(); } try { - // close the HBase connection - closeConnection(); + // close HBase connections. + closeConnection(this.connection); + closeConnection(this.invalidateMetadataCacheConnection); } finally { if (renewLeaseExecutor != null) { renewLeaseExecutor.shutdownNow(); @@ -3529,7 +3573,7 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement try { GLOBAL_QUERY_SERVICES_COUNTER.increment(); LOGGER.info("An instance of ConnectionQueryServices was created."); - openConnection(); + connection = openConnection(config); hConnectionEstablished = true; boolean lastDDLTimestampValidationEnabled = getProps().getBoolean( @@ -3546,9 +3590,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement success = true; return null; } - nSequenceSaltBuckets = ConnectionQueryServicesImpl.this.props.getInt( - QueryServices.SEQUENCE_SALT_BUCKETS_ATTRIB, - QueryServicesOptions.DEFAULT_SEQUENCE_TABLE_SALT_BUCKETS); boolean isDoNotUpgradePropSet = UpgradeUtil.isNoUpgradeSet(props); Properties scnProps = PropertiesUtil.deepCopy(props); scnProps.setProperty(PhoenixRuntime.CURRENT_SCN_ATTRIB, @@ -3639,7 +3680,8 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } try { if (!success && hConnectionEstablished) { - closeConnection(); + closeConnection(connection); + closeConnection(invalidateMetadataCacheConnection); } } catch (IOException e) { SQLException ex = new SQLException(e); @@ -4459,7 +4501,6 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement } else { nSequenceSaltBuckets = getSaltBuckets(e); } - updateSystemSequenceWithCacheOnWriteProps(metaConnection); } return metaConnection; @@ -6232,4 +6273,169 @@ public class ConnectionQueryServicesImpl extends DelegateQueryServices implement public List<LinkedBlockingQueue<WeakReference<PhoenixConnection>>> getCachedConnections() { return connectionQueues; } + + /** + * Invalidate metadata cache from all region servers for the given list of + * InvalidateServerMetadataCacheRequest. + * @throws Throwable + */ + public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests) + throws Throwable { + boolean invalidateCacheEnabled = + config.getBoolean(PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED, false); + if (!invalidateCacheEnabled) { + LOGGER.info("Skip invalidating server metadata cache since conf property" + + " phoenix.metadata.invalidate.cache.enabled is set to false"); + return; + } + if (!QueryUtil.isServerConnection(props)) { + LOGGER.warn(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE); + throw new Exception(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE); + } + + metricsMetadataCachingSource.incrementMetadataCacheInvalidationOperationsCount(); + Admin admin = getInvalidateMetadataCacheConnection().getAdmin(); + // This will incur an extra RPC to the master. This RPC is required since we want to + // get current list of regionservers. + Collection<ServerName> serverNames = admin.getRegionServers(true); + PhoenixStopWatch stopWatch = new PhoenixStopWatch().start(); + try { + invalidateServerMetadataCacheWithRetries(admin, serverNames, requests, false); + metricsMetadataCachingSource.incrementMetadataCacheInvalidationSuccessCount(); + } catch (Throwable t) { + metricsMetadataCachingSource.incrementMetadataCacheInvalidationFailureCount(); + throw t; + } finally { + metricsMetadataCachingSource + .addMetadataCacheInvalidationTotalTime(stopWatch.stop().elapsedMillis()); + } + } + + /** + * Invalidate metadata cache on all regionservers with retries for the given list of + * InvalidateServerMetadataCacheRequest. Each InvalidateServerMetadataCacheRequest contains + * tenantID, schema name and table name. + * We retry once before failing the operation. + * + * @param admin + * @param serverNames + * @param invalidateCacheRequests + * @param isRetry + * @throws Throwable + */ + private void invalidateServerMetadataCacheWithRetries(Admin admin, + Collection<ServerName> serverNames, + List<InvalidateServerMetadataCacheRequest> invalidateCacheRequests, + boolean isRetry) throws Throwable { + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest protoRequest = + getRequest(invalidateCacheRequests); + // TODO Do I need my own executor or can I re-use QueryServices#Executor + // since it is supposed to be used only for scans according to documentation? + List<CompletableFuture<Void>> futures = new ArrayList<>(); + Map<Future, ServerName> map = new HashMap<>(); + for (ServerName serverName : serverNames) { + CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { + try { + PhoenixStopWatch innerWatch = new PhoenixStopWatch().start(); + for (InvalidateServerMetadataCacheRequest invalidateCacheRequest + : invalidateCacheRequests) { + LOGGER.info("Sending invalidate metadata cache for {} to region server:" + + " {}", invalidateCacheRequest.toString(), serverName); + } + + RegionServerEndpointProtos.RegionServerEndpointService.BlockingInterface + service = RegionServerEndpointProtos.RegionServerEndpointService + .newBlockingStub(admin.coprocessorService(serverName)); + // The timeout for this particular request is managed by config parameter: + // hbase.rpc.timeout. Even if the future times out, this runnable can be in + // RUNNING state and will not be interrupted. + // We use the controller set in hbase connection. + service.invalidateServerMetadataCache(null, protoRequest); + long cacheInvalidationTime = innerWatch.stop().elapsedMillis(); + LOGGER.info("Invalidating metadata cache" + + " on region server: {} completed successfully and it took {} ms", + serverName, cacheInvalidationTime); + metricsMetadataCachingSource + .addMetadataCacheInvalidationRpcTime(cacheInvalidationTime); + } catch (ServiceException se) { + LOGGER.error("Invalidating metadata cache failed for regionserver {}", + serverName, se); + IOException ioe = ServerUtil.parseServiceException(se); + throw new CompletionException(ioe); + } + }); + futures.add(future); + map.put(future, serverName); + } + // Here we create one master like future which tracks individual future + // for each region server. + CompletableFuture<Void> allFutures = CompletableFuture.allOf( + futures.toArray(new CompletableFuture[0])); + long metadataCacheInvalidationTimeoutMs = config.getLong( + PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS, + PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT); + try { + allFutures.get(metadataCacheInvalidationTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + List<ServerName> failedServers = getFailedServers(futures, map); + LOGGER.error("Invalidating metadata cache for failed for region servers: {}", + failedServers, t); + if (isRetry) { + // If this is a retry attempt then just fail the operation. + if (allFutures.isCompletedExceptionally()) { + if (t instanceof ExecutionException) { + t = t.getCause(); + } + } + throw t; + } else { + // This is the first attempt, we can retry once. + // Indicate that this is a retry attempt. + invalidateServerMetadataCacheWithRetries(admin, failedServers, + invalidateCacheRequests, true); + } + } + } + + /** + * Get the list of regionservers that failed the invalidateCache rpc. + * @param futures futtures + * @param map map of future to server names + * @return the list of servers that failed the invalidateCache RPC. + */ + private List<ServerName> getFailedServers(List<CompletableFuture<Void>> futures, + Map<Future, ServerName> map) { + List<ServerName> failedServers = new ArrayList<>(); + for (CompletableFuture completedFuture : futures) { + if (!completedFuture.isDone()) { + // If this task is still running, cancel it and keep in retry list. + ServerName sn = map.get(completedFuture); + failedServers.add(sn); + // Even though we cancel this future but it doesn't interrupt the executing thread. + completedFuture.cancel(true); + } else if (completedFuture.isCompletedExceptionally() + || completedFuture.isCancelled()) { + // This means task is done but completed with exception + // or was canceled. Add it to retry list. + ServerName sn = map.get(completedFuture); + failedServers.add(sn); + } + } + return failedServers; + } + + private RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest getRequest( + List<InvalidateServerMetadataCacheRequest> requests) { + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.Builder builder = + RegionServerEndpointProtos.InvalidateServerMetadataCacheRequest.newBuilder(); + for (InvalidateServerMetadataCacheRequest request: requests) { + RegionServerEndpointProtos.InvalidateServerMetadataCache.Builder innerBuilder + = RegionServerEndpointProtos.InvalidateServerMetadataCache.newBuilder(); + innerBuilder.setTenantId(ByteStringer.wrap(request.getTenantId())); + innerBuilder.setSchemaName(ByteStringer.wrap(request.getSchemaName())); + innerBuilder.setTableName(ByteStringer.wrap(request.getTableName())); + builder.addInvalidateServerMetadataCacheRequests(innerBuilder.build()); + } + return builder.build(); + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java index dfe2a22874..62b6514f41 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionlessQueryServicesImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest; import org.apache.phoenix.coprocessor.MetaDataProtocol; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.coprocessor.MetaDataProtocol.MutationCode; @@ -824,4 +825,10 @@ public class ConnectionlessQueryServicesImpl extends DelegateQueryServices imple public int getConnectionCount(boolean isInternal) { return 0; } + + @Override + public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests) + throws Throwable { + // No-op + } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java index 0278345948..674f0c5d5e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/DelegateConnectionQueryServices.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.MutationPlan; +import org.apache.phoenix.coprocessor.InvalidateServerMetadataCacheRequest; import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -432,4 +433,10 @@ public class DelegateConnectionQueryServices extends DelegateQueryServices imple public int getConnectionCount(boolean isInternal) { return getDelegate().getConnectionCount(isInternal); } + + @Override + public void invalidateServerMetadataCache(List<InvalidateServerMetadataCacheRequest> requests) + throws Throwable { + getDelegate().invalidateServerMetadataCache(requests); + } } \ No newline at end of file diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 0316e18df9..71496f3630 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -186,6 +186,9 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_PRIOIRTY_ATTRIB = "phoenix.index.rpc.priority"; public static final String METADATA_PRIOIRTY_ATTRIB = "phoenix.metadata.rpc.priority"; public static final String SERVER_SIDE_PRIOIRTY_ATTRIB = "phoenix.serverside.rpc.priority"; + String INVALIDATE_METADATA_CACHE_PRIORITY_ATTRIB = + "phoenix.invalidate.metadata.cache.rpc.priority"; + public static final String ALLOW_LOCAL_INDEX_ATTRIB = "phoenix.index.allowLocalIndex"; // Retries when doing server side writes to SYSTEM.CATALOG @@ -251,6 +254,7 @@ public interface QueryServices extends SQLCloseable { public static final String INDEX_HANDLER_COUNT_ATTRIB = "phoenix.rpc.index.handler.count"; public static final String METADATA_HANDLER_COUNT_ATTRIB = "phoenix.rpc.metadata.handler.count"; public static final String SERVER_SIDE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.serverside.handler.count"; + String INVALIDATE_CACHE_HANDLER_COUNT_ATTRIB = "phoenix.rpc.invalidate.cache.handler.count"; public static final String FORCE_ROW_KEY_ORDER_ATTRIB = "phoenix.query.force.rowkeyorder"; public static final String ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB = "phoenix.functions.allowUserDefinedFunctions"; @@ -438,7 +442,11 @@ public interface QueryServices extends SQLCloseable { * Parameter to disable the server merges for hinted uncovered indexes */ String SERVER_MERGE_FOR_UNCOVERED_INDEX = "phoenix.query.global.server.merge.enable"; - + String PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS = + "phoenix.metadata.cache.invalidation.timeoutMs"; + // Default to 10 seconds. + long PHOENIX_METADATA_CACHE_INVALIDATION_TIMEOUT_MS_DEFAULT = 10 * 1000; + String PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED = "phoenix.metadata.invalidate.cache.enabled"; /** * Param to determine whether client can disable validation to figure out if any of the * descendent views extend primary key of their parents. Since this is a bit of diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index dafeadf08a..673f60d69d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -239,10 +239,12 @@ public class QueryServicesOptions { public static final int DEFAULT_SERVER_SIDE_PRIORITY = 500; public static final int DEFAULT_INDEX_PRIORITY = 1000; public static final int DEFAULT_METADATA_PRIORITY = 2000; + public static final int DEFAULT_INVALIDATE_METADATA_CACHE_PRIORITY = 3000; public static final boolean DEFAULT_ALLOW_LOCAL_INDEX = true; public static final int DEFAULT_INDEX_HANDLER_COUNT = 30; public static final int DEFAULT_METADATA_HANDLER_COUNT = 30; public static final int DEFAULT_SERVERSIDE_HANDLER_COUNT = 30; + public static final int DEFAULT_INVALIDATE_CACHE_HANDLER_COUNT = 10; public static final int DEFAULT_SYSTEM_MAX_VERSIONS = 1; public static final boolean DEFAULT_SYSTEM_KEEP_DELETED_CELLS = false; diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java similarity index 75% rename from phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java rename to phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java index 15b83e41b5..aa629c3118 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixRpcSchedulerTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Mockito.when; import java.net.InetSocketAddress; @@ -38,16 +39,9 @@ import org.mockito.Mockito; import org.apache.phoenix.thirdparty.com.google.common.collect.Lists; -/** - * Test that the rpc scheduler schedules index writes to the index handler queue and sends - * everything else to the standard queues - */ -public class PhoenixIndexRpcSchedulerTest { - +public class PhoenixRpcSchedulerTest { private static final Configuration conf = HBaseConfiguration.create(); private static final InetSocketAddress isa = new InetSocketAddress("localhost", 0); - - private class AbortServer implements Abortable { private boolean aborted = false; @@ -62,25 +56,29 @@ public class PhoenixIndexRpcSchedulerTest { } } + /** + * Test that the rpc scheduler schedules index writes to the index handler queue and sends + * everything else to the standard queues + */ @Test public void testIndexPriorityWritesToIndexHandler() throws Exception { RpcScheduler mock = Mockito.mock(RpcScheduler.class); PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class); Abortable abortable = new AbortServer(); - PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable); + PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, 230, qosFunction,abortable); BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", 1, 1,qosFunction,conf,abortable); scheduler.setIndexExecutorForTesting(executor); dispatchCallWithPriority(scheduler, 200); List<BlockingQueue<CallRunner>> queues = executor.getQueues(); assertEquals(1, queues.size()); BlockingQueue<CallRunner> queue = queues.get(0); - queue.poll(20, TimeUnit.SECONDS); + assertNotNull(queue.poll(5, TimeUnit.SECONDS)); // try again, this time we tweak the ranges we support - scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable); + scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, 115, qosFunction,abortable); scheduler.setIndexExecutorForTesting(executor); dispatchCallWithPriority(scheduler, 101); - queue.poll(20, TimeUnit.SECONDS); + assertNotNull(queue.poll(5, TimeUnit.SECONDS)); Mockito.verify(mock, Mockito.times(2)).init(Mockito.any(Context.class)); scheduler.stop(); @@ -92,7 +90,7 @@ public class PhoenixIndexRpcSchedulerTest { RpcScheduler mock = Mockito.mock(RpcScheduler.class); PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class); Abortable abortable = new AbortServer(); - PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, qosFunction,abortable); + PhoenixRpcScheduler scheduler1 = new PhoenixRpcScheduler(conf, mock, 200, 250, 100, 300, qosFunction,abortable); RpcExecutor executor1 = scheduler1.getServerSideExecutorForTesting(); for (int c = 0; c < 10; c++) { dispatchCallWithPriority(scheduler1, 100); @@ -103,7 +101,7 @@ public class PhoenixIndexRpcSchedulerTest { if (queue1.size() > 0) { numDispatches1 += queue1.size(); for (int i = 0; i < queue1.size(); i++) { - queue1.poll(20, TimeUnit.SECONDS); + assertNotNull(queue1.poll(5, TimeUnit.SECONDS)); } } } @@ -111,7 +109,7 @@ public class PhoenixIndexRpcSchedulerTest { scheduler1.stop(); // try again, with the incorrect executor - PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, qosFunction,abortable); + PhoenixRpcScheduler scheduler2 = new PhoenixRpcScheduler(conf, mock, 101, 110, 50, 25, qosFunction,abortable); RpcExecutor executor2 = scheduler2.getIndexExecutorForTesting(); dispatchCallWithPriority(scheduler2, 50); List<BlockingQueue<CallRunner>> queues2 = executor2.getQueues(); @@ -119,7 +117,7 @@ public class PhoenixIndexRpcSchedulerTest { for (BlockingQueue<CallRunner> queue2 : queues2) { if (queue2.size() > 0) { numDispatches2++; - queue2.poll(20, TimeUnit.SECONDS); + assertNotNull(queue2.poll(5, TimeUnit.SECONDS)); } } assertEquals(0, numDispatches2); @@ -140,12 +138,12 @@ public class PhoenixIndexRpcSchedulerTest { PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class); Abortable abortable = new AbortServer(); RpcScheduler mock = Mockito.mock(RpcScheduler.class); - PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, qosFunction,abortable); + PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, 200, 250, 225, 275, qosFunction,abortable); dispatchCallWithPriority(scheduler, 100); dispatchCallWithPriority(scheduler, 251); // try again, this time we tweak the ranges we support - scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, qosFunction,abortable); + scheduler = new PhoenixRpcScheduler(conf, mock, 101, 110, 105, 115, qosFunction,abortable); dispatchCallWithPriority(scheduler, 200); dispatchCallWithPriority(scheduler, 111); @@ -154,6 +152,33 @@ public class PhoenixIndexRpcSchedulerTest { scheduler.stop(); } + /** + * Test that the rpc scheduler schedules invalidate metadata cache RPC to + * the invalidate metadata cache executor. + */ + @Test + public void testInvalidateMetadataCacheExecutor() throws Exception { + RpcScheduler mock = Mockito.mock(RpcScheduler.class); + PriorityFunction qosFunction = Mockito.mock(PriorityFunction.class); + Abortable abortable = new AbortServer(); + // Set invalidate metadata cache priority to 230. + int invalidateMetadataCacheCallPriority = 230; + PhoenixRpcScheduler scheduler = new PhoenixRpcScheduler(conf, mock, + 200, 250, 225, invalidateMetadataCacheCallPriority, qosFunction,abortable); + BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor("test-queue", + 1, 1, qosFunction, conf, abortable); + scheduler.setInvalidateMetadataCacheExecutorForTesting(executor); + dispatchCallWithPriority(scheduler, invalidateMetadataCacheCallPriority); + List<BlockingQueue<CallRunner>> queues = executor.getQueues(); + assertEquals(1, queues.size()); + BlockingQueue<CallRunner> queue = queues.get(0); + assertEquals(1, queue.size()); + assertNotNull(queue.poll(5, TimeUnit.SECONDS)); + Mockito.verify(mock, Mockito.times(1)).init(Mockito.any(RpcScheduler.Context.class)); + scheduler.stop(); + executor.stop(); + } + private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception { CallRunner task = Mockito.mock(CallRunner.class); RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build(); diff --git a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java index 3ba83ff2ee..11f85172ff 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/cache/ServerMetadataCacheTest.java @@ -21,6 +21,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.coprocessor.metrics.MetricsMetadataCachingSource; import org.apache.phoenix.coprocessor.metrics.MetricsPhoenixCoprocessorSourceFactory; import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.monitoring.GlobalClientMetrics; @@ -46,8 +47,6 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.DriverManager; @@ -58,12 +57,14 @@ import java.util.Map; import java.util.Properties; import java.util.Random; -import static org.apache.phoenix.coprocessor.MetaDataEndpointImpl.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; +import static org.apache.phoenix.query.ConnectionQueryServicesImpl.INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE; +import static org.apache.phoenix.query.QueryServices.PHOENIX_METADATA_INVALIDATE_CACHE_ENABLED; import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; @@ -75,7 +76,6 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { private final Random RANDOM = new Random(42); private final long NEVER = (long) ConnectionProperty.UPDATE_CACHE_FREQUENCY.getValue("NEVER"); - private static final Logger LOGGER = LoggerFactory.getLogger(ServerMetadataCacheTest.class); @BeforeClass public static synchronized void doSetup() throws Exception { @@ -1422,8 +1422,24 @@ public class ServerMetadataCacheTest extends ParallelStatsDisabledIT { } } - //Helper methods + /* + Tests that invalidate server metadata cache fails on a non server connection. + */ + @Test + public void testInvalidateMetadataCacheOnNonServerConnection() { + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + try (PhoenixConnection conn = DriverManager.getConnection(getUrl(), props) + .unwrap(PhoenixConnection.class)) { + ConnectionQueryServices cqs = conn.getQueryServices(); + cqs.invalidateServerMetadataCache(null); + fail("Shouldn't come here"); + } catch (Throwable t) { + assertNotNull(t); + assertTrue(t.getMessage().contains(INVALIDATE_SERVER_METADATA_CACHE_EX_MESSAGE)); + } + } + //Helper methods private long getLastDDLTimestamp(String tableName) throws SQLException { Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); // Need to use different connection than what is used for creating table or indexes.