This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 736733d553 Reverting https://github.com/apache/pinot/pull/13645 PR
(#14259)
736733d553 is described below
commit 736733d5536307de4d27cc547a87b8cc9ab1ee85
Author: soumitra-st <[email protected]>
AuthorDate: Tue Oct 22 10:56:16 2024 -0700
Reverting https://github.com/apache/pinot/pull/13645 PR (#14259)
---
.../pinot/common/utils/grpc/GrpcQueryClient.java | 2 +-
.../pinot/core/transport/grpc/GrpcQueryServer.java | 2 +-
.../query/service/dispatch/DispatchClient.java | 16 +------------
.../pinot/query/service/server/QueryServer.java | 16 ++-----------
.../service/dispatch/QueryDispatcherTest.java | 2 +-
.../query/service/server/QueryServerTest.java | 2 +-
.../pinot/server/starter/ServerInstance.java | 27 +++++++++++-----------
.../pinot/server/worker/WorkerQueryServer.java | 20 ++++++++--------
8 files changed, 31 insertions(+), 56 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 808e8d0e02..9987876b95 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -84,7 +84,7 @@ public class GrpcQueryClient implements Closeable {
_channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
}
- public static SslContext buildSslContext(TlsConfig tlsConfig) {
+ private SslContext buildSslContext(TlsConfig tlsConfig) {
LOGGER.info("Building gRPC SSL context");
SslContext sslContext =
CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(),
tlsConfigHashCode -> {
try {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 4d8608c5ea..daae6d74cc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -119,7 +119,7 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
}
- public static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
+ private SslContext buildGRpcSslContext(TlsConfig tlsConfig)
throws IllegalArgumentException {
LOGGER.info("Building gRPC SSL context");
if (tlsConfig.getKeyStorePath() == null) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
index 4665b23a48..cbb8be1e6c 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
@@ -21,15 +21,11 @@ package org.apache.pinot.query.service.dispatch;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
-import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
-import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
-import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
-import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.query.routing.QueryServerInstance;
@@ -46,17 +42,7 @@ class DispatchClient {
private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
public DispatchClient(String host, int port) {
- this(host, port, new GrpcConfig(Collections.emptyMap()));
- }
-
- public DispatchClient(String host, int port, GrpcConfig grpcConfig) {
- if (grpcConfig.isUsePlainText()) {
- _channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
- } else {
- _channel =
- NettyChannelBuilder.forAddress(host, port)
-
.sslContext(GrpcQueryClient.buildSslContext(grpcConfig.getTlsConfig())).build();
- }
+ _channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
_dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 36104011f3..d8a1ecfef8 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -21,7 +21,6 @@ package org.apache.pinot.query.service.server;
import com.google.protobuf.ByteString;
import io.grpc.Server;
import io.grpc.ServerBuilder;
-import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
@@ -34,12 +33,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
-import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
-import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.StageMetadata;
@@ -65,7 +62,6 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private final int _port;
private final QueryRunner _queryRunner;
- private final TlsConfig _tlsConfig;
// query submission service is only used for plan submission for now.
// TODO: with complex query submission logic we should allow asynchronous
query submission return instead of
// directly return from submission response observer.
@@ -73,10 +69,9 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private Server _server = null;
- public QueryServer(int port, QueryRunner queryRunner, TlsConfig tlsConfig) {
+ public QueryServer(int port, QueryRunner queryRunner) {
_port = port;
_queryRunner = queryRunner;
- _tlsConfig = tlsConfig;
_querySubmissionExecutorService =
Executors.newCachedThreadPool(new
NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
}
@@ -85,14 +80,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
LOGGER.info("Starting QueryServer");
try {
if (_server == null) {
- if (_tlsConfig == null) {
- _server = ServerBuilder.forPort(_port).addService(this)
- .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
- } else {
- _server = NettyServerBuilder.forPort(_port).addService(this)
- .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig))
- .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
- }
+ _server =
ServerBuilder.forPort(_port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
LOGGER.info("Initialized QueryServer on port: {}", _port);
}
_queryRunner.start();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index c21c40b2d9..694fb3c087 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -60,7 +60,7 @@ public class QueryDispatcherTest extends QueryTestSet {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
- QueryServer queryServer = Mockito.spy(new QueryServer(availablePort,
queryRunner, null));
+ QueryServer queryServer = Mockito.spy(new QueryServer(availablePort,
queryRunner));
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 7a14a2a4c6..3a0b23408e 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -76,7 +76,7 @@ public class QueryServerTest extends QueryTestSet {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = mock(QueryRunner.class);
- QueryServer queryServer = new QueryServer(availablePort, queryRunner,
null);
+ QueryServer queryServer = new QueryServer(availablePort, queryRunner);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
_queryRunnerMap.put(availablePort, queryRunner);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 01f4402710..2a75ca7f5a 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -119,15 +119,15 @@ public class ServerInstance {
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_TLS_PREFIX);
NettyConfig nettyConfig =
NettyConfig.extractNettyConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_NETTY_PREFIX);
- accessControlFactory.init(
-
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
helixManager);
+ accessControlFactory
+
.init(serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
+ helixManager);
_accessControl = accessControlFactory.create();
if (serverConf.isMultiStageServerEnabled()) {
LOGGER.info("Initializing Multi-stage query engine");
- _workerQueryServer =
- new WorkerQueryServer(serverConf.getPinotConfig(),
_instanceDataManager, helixManager, _serverMetrics,
- serverConf.isNettyTlsServerEnabled() ? tlsConfig : null);
+ _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(),
_instanceDataManager, helixManager,
+ _serverMetrics);
} else {
_workerQueryServer = null;
}
@@ -135,9 +135,9 @@ public class ServerInstance {
if (serverConf.isNettyServerEnabled()) {
int nettyPort = serverConf.getNettyPort();
LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
- _instanceRequestHandler =
-
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(),
- _queryScheduler, _serverMetrics, new
AllowAllAccessFactory().create());
+ _instanceRequestHandler = ChannelHandlerFactory
+ .getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(), _queryScheduler,
+ _serverMetrics, new AllowAllAccessFactory().create());
_nettyQueryServer = new QueryServer(nettyPort, nettyConfig,
_instanceRequestHandler);
} else {
_nettyQueryServer = null;
@@ -146,9 +146,9 @@ public class ServerInstance {
if (serverConf.isNettyTlsServerEnabled()) {
int nettySecPort = serverConf.getNettyTlsPort();
LOGGER.info("Initializing TLS-secured Netty query server on port: {}",
nettySecPort);
- _instanceRequestHandler =
-
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(),
- _queryScheduler, _serverMetrics, _accessControl);
+ _instanceRequestHandler = ChannelHandlerFactory
+ .getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(), _queryScheduler,
+ _serverMetrics, _accessControl);
_nettyTlsQueryServer = new QueryServer(nettySecPort, nettyConfig,
tlsConfig, _instanceRequestHandler);
} else {
_nettyTlsQueryServer = null;
@@ -157,8 +157,9 @@ public class ServerInstance {
int grpcPort = serverConf.getGrpcPort();
LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
_grpcQueryServer = new GrpcQueryServer(grpcPort,
GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()),
- serverConf.isGrpcTlsServerEnabled() ?
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
- CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
_queryExecutor, _serverMetrics, _accessControl);
+ serverConf.isGrpcTlsServerEnabled() ? TlsUtils
+ .extractTlsConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
+ _queryExecutor, _serverMetrics, _accessControl);
} else {
_grpcQueryServer = null;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 542cc9bd90..45db3208ec 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -19,7 +19,6 @@
package org.apache.pinot.server.worker;
import org.apache.helix.HelixManager;
-import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.query.runtime.QueryRunner;
@@ -38,20 +37,19 @@ public class WorkerQueryServer {
private QueryRunner _queryRunner;
private InstanceDataManager _instanceDataManager;
private ServerMetrics _serverMetrics;
- private TlsConfig _tlsConfig;
public WorkerQueryServer(PinotConfiguration configuration,
InstanceDataManager instanceDataManager,
- HelixManager helixManager, ServerMetrics serverMetrics, TlsConfig
tlsConfig) {
+ HelixManager helixManager, ServerMetrics serverMetrics) {
_configuration = toWorkerQueryConfig(configuration);
_helixManager = helixManager;
_instanceDataManager = instanceDataManager;
- _tlsConfig = tlsConfig;
_serverMetrics = serverMetrics;
- _queryServicePort =
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
- CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+ _queryServicePort =
+
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+ CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
_queryRunner = new QueryRunner();
_queryRunner.init(_configuration, _instanceDataManager, _helixManager,
_serverMetrics);
- _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner,
_tlsConfig);
+ _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner);
}
private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration
configuration) {
@@ -64,15 +62,17 @@ public class WorkerQueryServer {
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId;
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
hostname);
}
- int runnerPort =
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+ int runnerPort = newConfig.getProperty(
+ CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
if (runnerPort == -1) {
runnerPort =
newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT,
CommonConstants.Server.DEFAULT_GRPC_PORT);
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
runnerPort);
}
- int servicePort =
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
- CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+ int servicePort =
+
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+ CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
if (servicePort == -1) {
servicePort =
newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT,
CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]