This is an automated email from the ASF dual-hosted git repository. ctubbsii pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 6eae7831fb7cb0ab4f7ba99bbc1d9d9b807445fc Merge: 15035d7b27 85accd6b86 Author: Christopher Tubbs <ctubb...@apache.org> AuthorDate: Thu Jul 25 00:17:26 2024 -0400 Merge branch '2.1' .../java/org/apache/accumulo/core/conf/Property.java | 20 ++++---------------- .../org/apache/accumulo/server/rpc/TServerUtils.java | 9 ++------- .../apache/accumulo/server/rpc/TServerUtilsTest.java | 3 +-- .../accumulo/coordinator/CompactionCoordinator.java | 5 +---- .../org/apache/accumulo/compactor/Compactor.java | 6 +----- .../apache/accumulo/gc/SimpleGarbageCollector.java | 2 +- .../java/org/apache/accumulo/manager/Manager.java | 3 +-- .../java/org/apache/accumulo/tserver/ScanServer.java | 5 +---- .../org/apache/accumulo/tserver/TabletServer.java | 8 +++----- .../test/functional/IdleProcessMetricsIT.java | 3 ++- .../test/functional/ThriftMaxFrameSizeIT.java | 3 +-- 11 files changed, 18 insertions(+), 49 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java index 910293fd48,8629cf1681..647603b26f --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@@ -263,8 -264,10 +266,6 @@@ public enum Property + " This does not equate to how often tickets are actually renewed (which is" + " performed at 80% of the ticket lifetime).", "1.6.5"), - @Deprecated(since = "2.1.3") - @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE) -- GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", PropertyType.BYTES, -- "The maximum size of a message that can be sent to a server.", "1.5.0"), @Experimental GENERAL_OPENTELEMETRY_ENABLED("general.opentelemetry.enabled", "false", PropertyType.BOOLEAN, "Enables tracing functionality using OpenTelemetry (assuming OpenTelemetry is configured).", @@@ -725,8 -795,10 +723,6 @@@ "2.1.0"), TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool.", "1.4.0"), - @Deprecated(since = "2.1.3") - @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE) -- TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.BYTES, -- "The maximum size of a message that can be sent to a tablet server.", "1.6.0"), TSERV_LOG_BUSY_TABLETS_COUNT("tserver.log.busy.tablets.count", "0", PropertyType.COUNT, "Number of busiest tablets to log. Logged at interval controlled by " + "tserver.log.busy.tablets.interval. If <= 0, logging of busy tablets is disabled.", @@@ -1187,12 -1483,6 +1183,9 @@@ @Experimental COMPACTOR_THREADCHECK("compactor.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool.", "2.1.0"), + @Experimental - COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M", PropertyType.BYTES, - "The maximum size of a message that can be sent to a tablet server.", "2.1.0"), - @Experimental + COMPACTOR_QUEUE_NAME("compactor.queue", "", PropertyType.STRING, + "The queue for which this Compactor will perform compactions.", "3.0.0"), // CompactionCoordinator properties @Experimental COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX, @@@ -1516,8 -1849,7 +1505,7 @@@ COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH, // max message options - SSERV_MAX_MESSAGE_SIZE, TSERV_MAX_MESSAGE_SIZE, COMPACTOR_MAX_MESSAGE_SIZE, - COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE, - TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE, ++ RPC_MAX_MESSAGE_SIZE, // block cache options TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, diff --cc server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index f9359b8442,87bfb4c0c8..5d9b44bbf5 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@@ -125,15 -125,15 +125,13 @@@ public class TServerUtils * @param minThreadProperty A Property to control the minimum number of threads in the pool * @param timeBetweenThreadChecksProperty A Property to control the amount of time between checks * to resize the thread pool -- * @param maxMessageSizeProperty A Property to control the maximum Thrift message size accepted * @return the server object created, and the port actually used * @throws UnknownHostException when we don't know our own address */ public static ServerAddress startServer(ServerContext context, String hostname, Property portHintProperty, TProcessor processor, String serverName, String threadName, Property portSearchProperty, Property minThreadProperty, Property threadTimeOutProperty, -- Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty) -- throws UnknownHostException { ++ Property timeBetweenThreadChecksProperty) throws UnknownHostException { final AccumuloConfiguration config = context.getConfiguration(); final IntStream portHint = config.getPortStream(portHintProperty); @@@ -153,10 -153,10 +151,7 @@@ timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty); } - long maxMessageSize = 10_000_000; - long maxMessageSize = Integer.MAX_VALUE; -- if (maxMessageSizeProperty != null) { -- maxMessageSize = config.getAsBytes(maxMessageSizeProperty); -- } ++ long maxMessageSize = config.getAsBytes(Property.RPC_MAX_MESSAGE_SIZE); boolean portSearch = false; if (portSearchProperty != null) { diff --cc server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java index 80d261f093,81e9cfa49c..2cb74d2b9b --- a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java @@@ -299,8 -305,8 +299,7 @@@ public class TServerUtilsTest return TServerUtils.startServer(context, hostname, Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread", Property.TSERV_PORTSEARCH, -- Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK, - Property.GENERAL_MAX_MESSAGE_SIZE); - Property.RPC_MAX_MESSAGE_SIZE); ++ Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, Property.TSERV_THREADCHECK); } } diff --cc server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java index ecf6709c62,16452b0805..c34f55b1e9 --- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java +++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java @@@ -241,15 -243,15 +241,12 @@@ public class CompactionCoordinator exte */ protected ServerAddress startCoordinatorClientService() throws UnknownHostException { var processor = ThriftProcessorTypes.getCoordinatorTProcessor(this, getContext()); - Property maxMessageSizeProperty = - (getConfiguration().get(Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE) != null - ? Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), Property.COMPACTION_COORDINATOR_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH, Property.COMPACTION_COORDINATOR_MINTHREADS, Property.COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT, -- Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty); ++ Property.COMPACTION_COORDINATOR_THREADCHECK); LOG.info("address = {}", sp.address); return sp; } diff --cc server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java index db7a302e97,033bb8c79d..1af688515c --- a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java +++ b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java @@@ -317,18 -329,15 +317,14 @@@ public class Compactor extends Abstract * @throws UnknownHostException host unknown */ protected ServerAddress startCompactorClientService() throws UnknownHostException { - var processor = ThriftProcessorTypes.getCompactorTProcessor(this, getContext()); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); + + ClientServiceHandler clientHandler = + new ClientServiceHandler(getContext(), new TransactionWatcher(getContext())); + var processor = ThriftProcessorTypes.getCompactorTProcessor(clientHandler, this, getContext()); - Property maxMessageSizeProperty = - (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null - ? Property.COMPACTOR_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), Property.COMPACTOR_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, Property.COMPACTOR_MINTHREADS, -- Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK, -- maxMessageSizeProperty); ++ Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK); LOG.info("address = {}", sp.address); return sp; } diff --cc server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 3c6ec53303,ee2f31d1f1..7537ebd44e --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@@ -367,7 -396,10 +367,7 @@@ public class SimpleGarbageCollector ext var processor = ThriftProcessorTypes.getGcTProcessor(this, getContext()); IntStream port = getConfiguration().getPortStream(Property.GC_PORT); HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), port); - long maxMessageSize = getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); - long maxMessageSize = getConfiguration().getAsBytes(maxMessageSizeProperty); ++ long maxMessageSize = getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE); ServerAddress server = TServerUtils.startTServer(getConfiguration(), getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, maxMessageSize, diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 923bfc33b4,16548ea6d0..1d71c17614 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -1226,10 -1214,13 +1226,9 @@@ public class Manager extends AbstractSe ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, haProxy, getContext()); try { - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); sa = TServerUtils.startServer(context, getHostname(), Property.MANAGER_CLIENTPORT, processor, "Manager", "Manager Client Service Handler", null, Property.MANAGER_MINTHREADS, -- Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK, - Property.GENERAL_MAX_MESSAGE_SIZE); - maxMessageSizeProperty); ++ Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 91f6f9f055,ad9c72d0c0..9840df50cf --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@@ -301,18 -304,15 +301,15 @@@ public class ScanServer extends Abstrac // This class implements TabletClientService.Iface and then delegates calls. Be sure // to set up the ThriftProcessor using this class, not the delegate. - TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, getContext()); + ClientServiceHandler clientHandler = + new ClientServiceHandler(context, new TransactionWatcher(context)); + TProcessor processor = + ThriftProcessorTypes.getScanServerTProcessor(clientHandler, this, getContext()); - Property maxMessageSizeProperty = - (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null - ? Property.SSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.GENERAL_MAX_MESSAGE_SIZE); ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(), Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.SSERV_PORTSEARCH, Property.SSERV_MINTHREADS, -- Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, maxMessageSizeProperty); ++ Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK); LOG.info("address = {}", sp.address); return sp; diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index f2f2388d45,79c969cf77..ecd8f66526 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -528,14 -544,15 +528,12 @@@ public class TabletServer extends Abstr } } - private HostAndPort startServer(AccumuloConfiguration conf, String address, TProcessor processor) + private HostAndPort startServer(String address, TProcessor processor) throws UnknownHostException { - Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null - ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE); - @SuppressWarnings("deprecation") - var maxMessageSizeProperty = getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE, - Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE); ServerAddress sp = TServerUtils.startServer(getContext(), address, Property.TSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, -- Property.TSERV_THREADCHECK, maxMessageSizeProperty); ++ Property.TSERV_THREADCHECK); this.server = sp.server; return sp.address; } @@@ -594,10 -611,9 +592,10 @@@ thriftClientHandler = newTabletClientHandler(watcher, writeTracker); scanClientHandler = newThriftScanClientHandler(writeTracker); - TProcessor processor = ThriftProcessorTypes.getTabletServerTProcessor(clientHandler, - thriftClientHandler, scanClientHandler, getContext()); + TProcessor processor = + ThriftProcessorTypes.getTabletServerTProcessor(clientHandler, thriftClientHandler, + scanClientHandler, thriftClientHandler, thriftClientHandler, getContext()); - HostAndPort address = startServer(getConfiguration(), clientAddress.getHost(), processor); + HostAndPort address = startServer(clientAddress.getHost(), processor); log.info("address = {}", address); return address; } diff --cc test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java index c57cfc8480,c57cfc8480..05033b0e40 --- a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java @@@ -30,6 -30,6 +30,7 @@@ import java.util.EnumSet import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; ++import java.util.function.Function; import org.apache.accumulo.compactor.Compactor; import org.apache.accumulo.coordinator.CompactionCoordinator; @@@ -219,7 -219,7 +220,7 @@@ public class IdleProcessMetricsIT exten try (Scanner scanner = client.createScanner(table1, Authorizations.EMPTY)) { scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL); -- var ignored = scanner.stream().count(); ++ scanner.stream().forEach(Function.identity()::apply); // iterate and ignore } log.info("Waiting for sserver to be not idle after starting a scan");