This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 21ff6bf354 TLS Configuration Support for QueryServer and Dispatch
Client (#13645)
21ff6bf354 is described below
commit 21ff6bf3545e4fe15fcdee9cc3851a04f85cc08c
Author: Anand Kr Shaw <[email protected]>
AuthorDate: Wed Sep 18 07:25:30 2024 +0530
TLS Configuration Support for QueryServer and Dispatch Client (#13645)
---
.../pinot/common/utils/grpc/GrpcQueryClient.java | 2 +-
.../pinot/core/transport/grpc/GrpcQueryServer.java | 2 +-
.../query/service/dispatch/DispatchClient.java | 16 ++++++++++++-
.../pinot/query/service/server/QueryServer.java | 16 +++++++++++--
.../service/dispatch/QueryDispatcherTest.java | 2 +-
.../query/service/server/QueryServerTest.java | 2 +-
.../pinot/server/starter/ServerInstance.java | 27 +++++++++++-----------
.../pinot/server/worker/WorkerQueryServer.java | 20 ++++++++--------
8 files changed, 56 insertions(+), 31 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
index 9987876b95..808e8d0e02 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/grpc/GrpcQueryClient.java
@@ -84,7 +84,7 @@ public class GrpcQueryClient implements Closeable {
_channelShutdownTimeoutSeconds = config.getChannelShutdownTimeoutSecond();
}
- private SslContext buildSslContext(TlsConfig tlsConfig) {
+ public static SslContext buildSslContext(TlsConfig tlsConfig) {
LOGGER.info("Building gRPC SSL context");
SslContext sslContext =
CLIENT_SSL_CONTEXTS_CACHE.computeIfAbsent(tlsConfig.hashCode(),
tlsConfigHashCode -> {
try {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index daae6d74cc..4d8608c5ea 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -119,7 +119,7 @@ public class GrpcQueryServer extends
PinotQueryServerGrpc.PinotQueryServerImplBa
ResourceManager.DEFAULT_QUERY_WORKER_THREADS);
}
- private SslContext buildGRpcSslContext(TlsConfig tlsConfig)
+ public static SslContext buildGRpcSslContext(TlsConfig tlsConfig)
throws IllegalArgumentException {
LOGGER.info("Building gRPC SSL context");
if (tlsConfig.getKeyStorePath() == null) {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
index 5b036930ce..cea23218cd 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/DispatchClient.java
@@ -21,10 +21,14 @@ package org.apache.pinot.query.service.dispatch;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.stub.StreamObserver;
+import java.util.Collections;
import java.util.function.Consumer;
+import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
+import org.apache.pinot.common.utils.grpc.GrpcQueryClient;
import org.apache.pinot.query.routing.QueryServerInstance;
@@ -41,7 +45,17 @@ class DispatchClient {
private final PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
public DispatchClient(String host, int port) {
- _channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
+ this(host, port, new GrpcConfig(Collections.emptyMap()));
+ }
+
+ public DispatchClient(String host, int port, GrpcConfig grpcConfig) {
+ if (grpcConfig.isUsePlainText()) {
+ _channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
+ } else {
+ _channel =
+ NettyChannelBuilder.forAddress(host, port)
+
.sslContext(GrpcQueryClient.buildSslContext(grpcConfig.getTlsConfig())).build();
+ }
_dispatchStub = PinotQueryWorkerGrpc.newStub(_channel);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 763192e16e..65e4ca7df0 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.query.service.server;
import io.grpc.Server;
import io.grpc.ServerBuilder;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.Map;
@@ -27,10 +28,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.routing.QueryPlanSerDeUtils;
import org.apache.pinot.query.routing.StageMetadata;
import org.apache.pinot.query.routing.StagePlan;
@@ -55,6 +58,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private final int _port;
private final QueryRunner _queryRunner;
+ private final TlsConfig _tlsConfig;
// query submission service is only used for plan submission for now.
// TODO: with complex query submission logic we should allow asynchronous
query submission return instead of
// directly return from submission response observer.
@@ -62,9 +66,10 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
private Server _server = null;
- public QueryServer(int port, QueryRunner queryRunner) {
+ public QueryServer(int port, QueryRunner queryRunner, TlsConfig tlsConfig) {
_port = port;
_queryRunner = queryRunner;
+ _tlsConfig = tlsConfig;
_querySubmissionExecutorService =
Executors.newCachedThreadPool(new
NamedThreadFactory("query_submission_executor_on_" + _port + "_port"));
}
@@ -73,7 +78,14 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
LOGGER.info("Starting QueryServer");
try {
if (_server == null) {
- _server =
ServerBuilder.forPort(_port).addService(this).maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
+ if (_tlsConfig == null) {
+ _server = ServerBuilder.forPort(_port).addService(this)
+ .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
+ } else {
+ _server = NettyServerBuilder.forPort(_port).addService(this)
+ .sslContext(GrpcQueryServer.buildGRpcSslContext(_tlsConfig))
+ .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE).build();
+ }
LOGGER.info("Initialized QueryServer on port: {}", _port);
}
_queryRunner.start();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
index 694fb3c087..c21c40b2d9 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/dispatch/QueryDispatcherTest.java
@@ -60,7 +60,7 @@ public class QueryDispatcherTest extends QueryTestSet {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = Mockito.mock(QueryRunner.class);
- QueryServer queryServer = Mockito.spy(new QueryServer(availablePort,
queryRunner));
+ QueryServer queryServer = Mockito.spy(new QueryServer(availablePort,
queryRunner, null));
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
index 3a0b23408e..7a14a2a4c6 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/server/QueryServerTest.java
@@ -76,7 +76,7 @@ public class QueryServerTest extends QueryTestSet {
for (int i = 0; i < QUERY_SERVER_COUNT; i++) {
int availablePort = QueryTestUtils.getAvailablePort();
QueryRunner queryRunner = mock(QueryRunner.class);
- QueryServer queryServer = new QueryServer(availablePort, queryRunner);
+ QueryServer queryServer = new QueryServer(availablePort, queryRunner,
null);
queryServer.start();
_queryServerMap.put(availablePort, queryServer);
_queryRunnerMap.put(availablePort, queryRunner);
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 2a75ca7f5a..01f4402710 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -119,15 +119,15 @@ public class ServerInstance {
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_TLS_PREFIX);
NettyConfig nettyConfig =
NettyConfig.extractNettyConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_NETTY_PREFIX);
- accessControlFactory
-
.init(serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
- helixManager);
+ accessControlFactory.init(
+
serverConf.getPinotConfig().subset(CommonConstants.Server.PREFIX_OF_CONFIG_OF_ACCESS_CONTROL),
helixManager);
_accessControl = accessControlFactory.create();
if (serverConf.isMultiStageServerEnabled()) {
LOGGER.info("Initializing Multi-stage query engine");
- _workerQueryServer = new WorkerQueryServer(serverConf.getPinotConfig(),
_instanceDataManager, helixManager,
- _serverMetrics);
+ _workerQueryServer =
+ new WorkerQueryServer(serverConf.getPinotConfig(),
_instanceDataManager, helixManager, _serverMetrics,
+ serverConf.isNettyTlsServerEnabled() ? tlsConfig : null);
} else {
_workerQueryServer = null;
}
@@ -135,9 +135,9 @@ public class ServerInstance {
if (serverConf.isNettyServerEnabled()) {
int nettyPort = serverConf.getNettyPort();
LOGGER.info("Initializing Netty query server on port: {}", nettyPort);
- _instanceRequestHandler = ChannelHandlerFactory
- .getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(), _queryScheduler,
- _serverMetrics, new AllowAllAccessFactory().create());
+ _instanceRequestHandler =
+
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(),
+ _queryScheduler, _serverMetrics, new
AllowAllAccessFactory().create());
_nettyQueryServer = new QueryServer(nettyPort, nettyConfig,
_instanceRequestHandler);
} else {
_nettyQueryServer = null;
@@ -146,9 +146,9 @@ public class ServerInstance {
if (serverConf.isNettyTlsServerEnabled()) {
int nettySecPort = serverConf.getNettyTlsPort();
LOGGER.info("Initializing TLS-secured Netty query server on port: {}",
nettySecPort);
- _instanceRequestHandler = ChannelHandlerFactory
- .getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(), _queryScheduler,
- _serverMetrics, _accessControl);
+ _instanceRequestHandler =
+
ChannelHandlerFactory.getInstanceRequestHandler(helixManager.getInstanceName(),
serverConf.getPinotConfig(),
+ _queryScheduler, _serverMetrics, _accessControl);
_nettyTlsQueryServer = new QueryServer(nettySecPort, nettyConfig,
tlsConfig, _instanceRequestHandler);
} else {
_nettyTlsQueryServer = null;
@@ -157,9 +157,8 @@ public class ServerInstance {
int grpcPort = serverConf.getGrpcPort();
LOGGER.info("Initializing gRPC query server on port: {}", grpcPort);
_grpcQueryServer = new GrpcQueryServer(grpcPort,
GrpcConfig.buildGrpcQueryConfig(serverConf.getPinotConfig()),
- serverConf.isGrpcTlsServerEnabled() ? TlsUtils
- .extractTlsConfig(serverConf.getPinotConfig(),
CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
- _queryExecutor, _serverMetrics, _accessControl);
+ serverConf.isGrpcTlsServerEnabled() ?
TlsUtils.extractTlsConfig(serverConf.getPinotConfig(),
+ CommonConstants.Server.SERVER_GRPCTLS_PREFIX) : null,
_queryExecutor, _serverMetrics, _accessControl);
} else {
_grpcQueryServer = null;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
index 45db3208ec..542cc9bd90 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/worker/WorkerQueryServer.java
@@ -19,6 +19,7 @@
package org.apache.pinot.server.worker;
import org.apache.helix.HelixManager;
+import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.query.runtime.QueryRunner;
@@ -37,19 +38,20 @@ public class WorkerQueryServer {
private QueryRunner _queryRunner;
private InstanceDataManager _instanceDataManager;
private ServerMetrics _serverMetrics;
+ private TlsConfig _tlsConfig;
public WorkerQueryServer(PinotConfiguration configuration,
InstanceDataManager instanceDataManager,
- HelixManager helixManager, ServerMetrics serverMetrics) {
+ HelixManager helixManager, ServerMetrics serverMetrics, TlsConfig
tlsConfig) {
_configuration = toWorkerQueryConfig(configuration);
_helixManager = helixManager;
_instanceDataManager = instanceDataManager;
+ _tlsConfig = tlsConfig;
_serverMetrics = serverMetrics;
- _queryServicePort =
-
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
- CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+ _queryServicePort =
_configuration.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+ CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
_queryRunner = new QueryRunner();
_queryRunner.init(_configuration, _instanceDataManager, _helixManager,
_serverMetrics);
- _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner);
+ _queryWorkerService = new QueryServer(_queryServicePort, _queryRunner,
_tlsConfig);
}
private static PinotConfiguration toWorkerQueryConfig(PinotConfiguration
configuration) {
@@ -62,17 +64,15 @@ public class WorkerQueryServer {
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceId;
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_HOSTNAME,
hostname);
}
- int runnerPort = newConfig.getProperty(
- CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
+ int runnerPort =
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_RUNNER_PORT);
if (runnerPort == -1) {
runnerPort =
newConfig.getProperty(CommonConstants.Server.CONFIG_OF_GRPC_PORT,
CommonConstants.Server.DEFAULT_GRPC_PORT);
newConfig.addProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_RUNNER_PORT,
runnerPort);
}
- int servicePort =
-
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
- CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
+ int servicePort =
newConfig.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_QUERY_SERVER_PORT,
+ CommonConstants.MultiStageQueryRunner.DEFAULT_QUERY_SERVER_PORT);
if (servicePort == -1) {
servicePort =
newConfig.getProperty(CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT,
CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]