This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 6eae7831fb7cb0ab4f7ba99bbc1d9d9b807445fc
Merge: 15035d7b27 85accd6b86
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Thu Jul 25 00:17:26 2024 -0400

    Merge branch '2.1'

 .../java/org/apache/accumulo/core/conf/Property.java | 20 ++++----------------
 .../org/apache/accumulo/server/rpc/TServerUtils.java |  9 ++-------
 .../apache/accumulo/server/rpc/TServerUtilsTest.java |  3 +--
 .../accumulo/coordinator/CompactionCoordinator.java  |  5 +----
 .../org/apache/accumulo/compactor/Compactor.java     |  6 +-----
 .../apache/accumulo/gc/SimpleGarbageCollector.java   |  2 +-
 .../java/org/apache/accumulo/manager/Manager.java    |  3 +--
 .../java/org/apache/accumulo/tserver/ScanServer.java |  5 +----
 .../org/apache/accumulo/tserver/TabletServer.java    |  8 +++-----
 .../test/functional/IdleProcessMetricsIT.java        |  3 ++-
 .../test/functional/ThriftMaxFrameSizeIT.java        |  3 +--
 11 files changed, 18 insertions(+), 49 deletions(-)

diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 910293fd48,8629cf1681..647603b26f
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -263,8 -264,10 +266,6 @@@ public enum Property 
            + " This does not equate to how often tickets are actually renewed 
(which is"
            + " performed at 80% of the ticket lifetime).",
        "1.6.5"),
 -  @Deprecated(since = "2.1.3")
 -  @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE)
--  GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", 
PropertyType.BYTES,
--      "The maximum size of a message that can be sent to a server.", "1.5.0"),
    @Experimental
    GENERAL_OPENTELEMETRY_ENABLED("general.opentelemetry.enabled", "false", 
PropertyType.BOOLEAN,
        "Enables tracing functionality using OpenTelemetry (assuming 
OpenTelemetry is configured).",
@@@ -725,8 -795,10 +723,6 @@@
        "2.1.0"),
    TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", 
PropertyType.TIMEDURATION,
        "The time between adjustments of the server thread pool.", "1.4.0"),
 -  @Deprecated(since = "2.1.3")
 -  @ReplacedBy(property = RPC_MAX_MESSAGE_SIZE)
--  TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", 
PropertyType.BYTES,
--      "The maximum size of a message that can be sent to a tablet server.", 
"1.6.0"),
    TSERV_LOG_BUSY_TABLETS_COUNT("tserver.log.busy.tablets.count", "0", 
PropertyType.COUNT,
        "Number of busiest tablets to log. Logged at interval controlled by "
            + "tserver.log.busy.tablets.interval. If <= 0, logging of busy 
tablets is disabled.",
@@@ -1187,12 -1483,6 +1183,9 @@@
    @Experimental
    COMPACTOR_THREADCHECK("compactor.threadcheck.time", "1s", 
PropertyType.TIMEDURATION,
        "The time between adjustments of the server thread pool.", "2.1.0"),
 +  @Experimental
-   COMPACTOR_MAX_MESSAGE_SIZE("compactor.message.size.max", "10M", 
PropertyType.BYTES,
-       "The maximum size of a message that can be sent to a tablet server.", 
"2.1.0"),
-   @Experimental
 +  COMPACTOR_QUEUE_NAME("compactor.queue", "", PropertyType.STRING,
 +      "The queue for which this Compactor will perform compactions.", 
"3.0.0"),
    // CompactionCoordinator properties
    @Experimental
    COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, 
PropertyType.PREFIX,
@@@ -1516,8 -1849,7 +1505,7 @@@
        COMPACTOR_PORTSEARCH, TSERV_PORTSEARCH,
  
        // max message options
-       SSERV_MAX_MESSAGE_SIZE, TSERV_MAX_MESSAGE_SIZE, 
COMPACTOR_MAX_MESSAGE_SIZE,
-       COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE,
 -      TSERV_MAX_MESSAGE_SIZE, GENERAL_MAX_MESSAGE_SIZE, RPC_MAX_MESSAGE_SIZE,
++      RPC_MAX_MESSAGE_SIZE,
  
        // block cache options
        TSERV_CACHE_MANAGER_IMPL, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE,
diff --cc 
server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index f9359b8442,87bfb4c0c8..5d9b44bbf5
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@@ -125,15 -125,15 +125,13 @@@ public class TServerUtils 
     * @param minThreadProperty A Property to control the minimum number of 
threads in the pool
     * @param timeBetweenThreadChecksProperty A Property to control the amount 
of time between checks
     *        to resize the thread pool
--   * @param maxMessageSizeProperty A Property to control the maximum Thrift 
message size accepted
     * @return the server object created, and the port actually used
     * @throws UnknownHostException when we don't know our own address
     */
    public static ServerAddress startServer(ServerContext context, String 
hostname,
        Property portHintProperty, TProcessor processor, String serverName, 
String threadName,
        Property portSearchProperty, Property minThreadProperty, Property 
threadTimeOutProperty,
--      Property timeBetweenThreadChecksProperty, Property 
maxMessageSizeProperty)
--      throws UnknownHostException {
++      Property timeBetweenThreadChecksProperty) throws UnknownHostException {
      final AccumuloConfiguration config = context.getConfiguration();
  
      final IntStream portHint = config.getPortStream(portHintProperty);
@@@ -153,10 -153,10 +151,7 @@@
        timeBetweenThreadChecks = 
config.getTimeInMillis(timeBetweenThreadChecksProperty);
      }
  
-     long maxMessageSize = 10_000_000;
 -    long maxMessageSize = Integer.MAX_VALUE;
--    if (maxMessageSizeProperty != null) {
--      maxMessageSize = config.getAsBytes(maxMessageSizeProperty);
--    }
++    long maxMessageSize = config.getAsBytes(Property.RPC_MAX_MESSAGE_SIZE);
  
      boolean portSearch = false;
      if (portSearchProperty != null) {
diff --cc 
server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
index 80d261f093,81e9cfa49c..2cb74d2b9b
--- 
a/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
+++ 
b/server/base/src/test/java/org/apache/accumulo/server/rpc/TServerUtilsTest.java
@@@ -299,8 -305,8 +299,7 @@@ public class TServerUtilsTest 
  
      return TServerUtils.startServer(context, hostname, 
Property.TSERV_CLIENTPORT, processor,
          "TServerUtilsTest", "TServerUtilsTestThread", 
Property.TSERV_PORTSEARCH,
--        Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, 
Property.TSERV_THREADCHECK,
-         Property.GENERAL_MAX_MESSAGE_SIZE);
 -        Property.RPC_MAX_MESSAGE_SIZE);
++        Property.TSERV_MINTHREADS, Property.TSERV_MINTHREADS_TIMEOUT, 
Property.TSERV_THREADCHECK);
  
    }
  }
diff --cc 
server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index ecf6709c62,16452b0805..c34f55b1e9
--- 
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ 
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@@ -241,15 -243,15 +241,12 @@@ public class CompactionCoordinator exte
     */
    protected ServerAddress startCoordinatorClientService() throws 
UnknownHostException {
      var processor = ThriftProcessorTypes.getCoordinatorTProcessor(this, 
getContext());
-     Property maxMessageSizeProperty =
-         
(getConfiguration().get(Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE) != 
null
-             ? Property.COMPACTION_COORDINATOR_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
 -    @SuppressWarnings("deprecation")
 -    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
 -        Property.GENERAL_MAX_MESSAGE_SIZE);
      ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
          Property.COMPACTION_COORDINATOR_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
          "Thrift Client Server", 
Property.COMPACTION_COORDINATOR_THRIFTCLIENT_PORTSEARCH,
          Property.COMPACTION_COORDINATOR_MINTHREADS,
          Property.COMPACTION_COORDINATOR_MINTHREADS_TIMEOUT,
--        Property.COMPACTION_COORDINATOR_THREADCHECK, maxMessageSizeProperty);
++        Property.COMPACTION_COORDINATOR_THREADCHECK);
      LOG.info("address = {}", sp.address);
      return sp;
    }
diff --cc 
server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index db7a302e97,033bb8c79d..1af688515c
--- 
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++ 
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@@ -317,18 -329,15 +317,14 @@@ public class Compactor extends Abstract
     * @throws UnknownHostException host unknown
     */
    protected ServerAddress startCompactorClientService() throws 
UnknownHostException {
 -    var processor = ThriftProcessorTypes.getCompactorTProcessor(this, 
getContext());
 -    @SuppressWarnings("deprecation")
 -    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
 -        Property.GENERAL_MAX_MESSAGE_SIZE);
 +
 +    ClientServiceHandler clientHandler =
 +        new ClientServiceHandler(getContext(), new 
TransactionWatcher(getContext()));
 +    var processor = 
ThriftProcessorTypes.getCompactorTProcessor(clientHandler, this, getContext());
-     Property maxMessageSizeProperty =
-         (getConfiguration().get(Property.COMPACTOR_MAX_MESSAGE_SIZE) != null
-             ? Property.COMPACTOR_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
      ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
          Property.COMPACTOR_CLIENTPORT, processor, 
this.getClass().getSimpleName(),
          "Thrift Client Server", Property.COMPACTOR_PORTSEARCH, 
Property.COMPACTOR_MINTHREADS,
--        Property.COMPACTOR_MINTHREADS_TIMEOUT, Property.COMPACTOR_THREADCHECK,
--        maxMessageSizeProperty);
++        Property.COMPACTOR_MINTHREADS_TIMEOUT, 
Property.COMPACTOR_THREADCHECK);
      LOG.info("address = {}", sp.address);
      return sp;
    }
diff --cc 
server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 3c6ec53303,ee2f31d1f1..7537ebd44e
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@@ -367,7 -396,10 +367,7 @@@ public class SimpleGarbageCollector ext
      var processor = ThriftProcessorTypes.getGcTProcessor(this, getContext());
      IntStream port = getConfiguration().getPortStream(Property.GC_PORT);
      HostAndPort[] addresses = TServerUtils.getHostAndPorts(getHostname(), 
port);
-     long maxMessageSize = 
getConfiguration().getAsBytes(Property.GENERAL_MAX_MESSAGE_SIZE);
 -    @SuppressWarnings("deprecation")
 -    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
 -        Property.GENERAL_MAX_MESSAGE_SIZE);
 -    long maxMessageSize = 
getConfiguration().getAsBytes(maxMessageSizeProperty);
++    long maxMessageSize = 
getConfiguration().getAsBytes(Property.RPC_MAX_MESSAGE_SIZE);
      ServerAddress server = TServerUtils.startTServer(getConfiguration(),
          getContext().getThriftServerType(), processor, 
this.getClass().getSimpleName(),
          "GC Monitor Service", 2, ThreadPools.DEFAULT_TIMEOUT_MILLISECS, 1000, 
maxMessageSize,
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 923bfc33b4,16548ea6d0..1d71c17614
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -1226,10 -1214,13 +1226,9 @@@ public class Manager extends AbstractSe
          ThriftProcessorTypes.getManagerTProcessor(fateServiceHandler, 
haProxy, getContext());
  
      try {
 -      @SuppressWarnings("deprecation")
 -      var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
 -          Property.GENERAL_MAX_MESSAGE_SIZE);
        sa = TServerUtils.startServer(context, getHostname(), 
Property.MANAGER_CLIENTPORT, processor,
            "Manager", "Manager Client Service Handler", null, 
Property.MANAGER_MINTHREADS,
--          Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK,
-           Property.GENERAL_MAX_MESSAGE_SIZE);
 -          maxMessageSizeProperty);
++          Property.MANAGER_MINTHREADS_TIMEOUT, Property.MANAGER_THREADCHECK);
      } catch (UnknownHostException e) {
        throw new IllegalStateException("Unable to start server on host " + 
getHostname(), e);
      }
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 91f6f9f055,ad9c72d0c0..9840df50cf
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@@ -301,18 -304,15 +301,15 @@@ public class ScanServer extends Abstrac
  
      // This class implements TabletClientService.Iface and then delegates 
calls. Be sure
      // to set up the ThriftProcessor using this class, not the delegate.
 -    TProcessor processor = ThriftProcessorTypes.getScanServerTProcessor(this, 
getContext());
 +    ClientServiceHandler clientHandler =
 +        new ClientServiceHandler(context, new TransactionWatcher(context));
 +    TProcessor processor =
 +        ThriftProcessorTypes.getScanServerTProcessor(clientHandler, this, 
getContext());
  
-     Property maxMessageSizeProperty =
-         (getConfiguration().get(Property.SSERV_MAX_MESSAGE_SIZE) != null
-             ? Property.SSERV_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
 -    @SuppressWarnings("deprecation")
 -    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
 -        Property.GENERAL_MAX_MESSAGE_SIZE);
      ServerAddress sp = TServerUtils.startServer(getContext(), getHostname(),
          Property.SSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
          "Thrift Client Server", Property.SSERV_PORTSEARCH, 
Property.SSERV_MINTHREADS,
--        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK, 
maxMessageSizeProperty);
++        Property.SSERV_MINTHREADS_TIMEOUT, Property.SSERV_THREADCHECK);
  
      LOG.info("address = {}", sp.address);
      return sp;
diff --cc 
server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index f2f2388d45,79c969cf77..ecd8f66526
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@@ -528,14 -544,15 +528,12 @@@ public class TabletServer extends Abstr
      }
    }
  
-   private HostAndPort startServer(AccumuloConfiguration conf, String address, 
TProcessor processor)
+   private HostAndPort startServer(String address, TProcessor processor)
        throws UnknownHostException {
-     Property maxMessageSizeProperty = 
(conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null
-         ? Property.TSERV_MAX_MESSAGE_SIZE : 
Property.GENERAL_MAX_MESSAGE_SIZE);
 -    @SuppressWarnings("deprecation")
 -    var maxMessageSizeProperty = 
getConfiguration().resolve(Property.RPC_MAX_MESSAGE_SIZE,
 -        Property.TSERV_MAX_MESSAGE_SIZE, Property.GENERAL_MAX_MESSAGE_SIZE);
      ServerAddress sp = TServerUtils.startServer(getContext(), address, 
Property.TSERV_CLIENTPORT,
          processor, this.getClass().getSimpleName(), "Thrift Client Server",
          Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, 
Property.TSERV_MINTHREADS_TIMEOUT,
--        Property.TSERV_THREADCHECK, maxMessageSizeProperty);
++        Property.TSERV_THREADCHECK);
      this.server = sp.server;
      return sp.address;
    }
@@@ -594,10 -611,9 +592,10 @@@
      thriftClientHandler = newTabletClientHandler(watcher, writeTracker);
      scanClientHandler = newThriftScanClientHandler(writeTracker);
  
 -    TProcessor processor = 
ThriftProcessorTypes.getTabletServerTProcessor(clientHandler,
 -        thriftClientHandler, scanClientHandler, getContext());
 +    TProcessor processor =
 +        ThriftProcessorTypes.getTabletServerTProcessor(clientHandler, 
thriftClientHandler,
 +            scanClientHandler, thriftClientHandler, thriftClientHandler, 
getContext());
-     HostAndPort address = startServer(getConfiguration(), 
clientAddress.getHost(), processor);
+     HostAndPort address = startServer(clientAddress.getHost(), processor);
      log.info("address = {}", address);
      return address;
    }
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
index c57cfc8480,c57cfc8480..05033b0e40
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/IdleProcessMetricsIT.java
@@@ -30,6 -30,6 +30,7 @@@ import java.util.EnumSet
  import java.util.List;
  import java.util.Map;
  import java.util.concurrent.atomic.AtomicBoolean;
++import java.util.function.Function;
  
  import org.apache.accumulo.compactor.Compactor;
  import org.apache.accumulo.coordinator.CompactionCoordinator;
@@@ -219,7 -219,7 +220,7 @@@ public class IdleProcessMetricsIT exten
  
        try (Scanner scanner = client.createScanner(table1, 
Authorizations.EMPTY)) {
          scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
--        var ignored = scanner.stream().count();
++        scanner.stream().forEach(Function.identity()::apply); // iterate and 
ignore
        }
  
        log.info("Waiting for sserver to be not idle after starting a scan");

Reply via email to