Copilot commented on code in PR #16666:
URL: https://github.com/apache/pinot/pull/16666#discussion_r2322860067


##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala:
##########
@@ -98,7 +153,29 @@ private[reader] class PinotServerDataFetcher(
     val instanceConfig = new InstanceConfig(nullZkId)
     instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
     instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
-    // TODO: support netty-sec
+    
+    // Configure TLS for server instance if HTTPS is enabled
+    if (dataSourceOptions.useHttps) {
+      instanceConfig.getRecord.setSimpleField("TLS_PORT", 
pinotSplit.serverAndSegments.serverPort)
+    }
+    
+    // Configure gRPC port
+    instanceConfig.getRecord.setSimpleField("GRPC_PORT", 
dataSourceOptions.grpcPort.toString)
+    
+    // Configure proxy forwarding if enabled
+    if (dataSourceOptions.proxyEnabled && 
dataSourceOptions.grpcProxyUri.isDefined) {
+      // When using proxy, the server instance should point to the proxy
+      val proxyUri = dataSourceOptions.grpcProxyUri.get
+      val (proxyHost, proxyPort) = 
org.apache.pinot.connector.spark.common.NetUtils.parseHostPort(proxyUri, 
dataSourceOptions.useHttps)

Review Comment:
   The code uses `dataSourceOptions.useHttps` as the secure flag for parsing 
the gRPC proxy URI, but this should use gRPC-specific security settings. It 
should use `!dataSourceOptions.grpcUsePlainText` instead, as gRPC TLS and HTTP 
TLS are configured independently.
   ```suggestion
         val (proxyHost, proxyPort) = 
org.apache.pinot.connector.spark.common.NetUtils.parseHostPort(proxyUri, 
!dataSourceOptions.grpcUsePlainText)
   ```



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptions.scala:
##########
@@ -38,13 +38,48 @@ object PinotDataSourceReadOptions {
   var CONFIG_USE_GRPC_SERVER = "useGrpcServer"
   val CONFIG_QUERY_OPTIONS = "queryOptions"
   val CONFIG_FAIL_ON_INVALID_SEGMENTS = "failOnInvalidSegments"
+  val CONFIG_USE_HTTPS = "useHttps"
+  // Unified security switch: when set, it implies HTTPS for HTTP and TLS for 
gRPC (unless overridden)
+  val CONFIG_SECURE_MODE = "secureMode"

Review Comment:
   The comment describes `secureMode` as a unified security switch, but the 
configuration constant is defined but never actually used in the parsing logic. 
The `secureMode` functionality is implemented using `CONFIG_SECURE_MODE` but 
this constant appears to be unused, which could be confusing.



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotGrpcServerDataFetcher.scala:
##########
@@ -18,34 +18,82 @@
  */
 package org.apache.pinot.connector.spark.common.reader
 
-import io.grpc.ManagedChannelBuilder
+import io.grpc.{ManagedChannelBuilder, Metadata}
+import io.grpc.stub.MetadataUtils
 import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
 import org.apache.pinot.common.proto.PinotQueryServerGrpc
 import org.apache.pinot.common.proto.Server.ServerRequest
-import org.apache.pinot.connector.spark.common.Logging
+import org.apache.pinot.connector.spark.common.{AuthUtils, Logging, 
PinotDataSourceReadOptions}
 import org.apache.pinot.connector.spark.common.partition.PinotSplit
 import org.apache.pinot.spi.config.table.TableType
 
 import java.io.Closeable
+import java.net.URI
 import scala.collection.JavaConverters._
 
 /**
  * Data fetcher from Pinot Grpc server for specific segments.
  * Eg: offline-server1: segment1, segment2, segment3
  */
-private[reader] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
+private[reader] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit, 
readOptions: PinotDataSourceReadOptions)
   extends Logging with Closeable {
 
-  private val channel = ManagedChannelBuilder
-    .forAddress(pinotSplit.serverAndSegments.serverHost, 
pinotSplit.serverAndSegments.serverGrpcPort)
-    .usePlaintext()
-    .maxInboundMessageSize(Int.MaxValue)
-    .asInstanceOf[ManagedChannelBuilder[_]].build()
-  private val pinotServerBlockingStub = 
PinotQueryServerGrpc.newBlockingStub(channel)
+  private val (channelHost, channelPort) = {
+    if (readOptions.proxyEnabled && readOptions.grpcProxyUri.nonEmpty) {
+      val proxyUri = readOptions.grpcProxyUri.get
+      val (hostStr, portStr) = 
org.apache.pinot.connector.spark.common.NetUtils.parseHostPort(proxyUri, 
readOptions.useHttps)
+      val host = hostStr
+      val port = portStr.toInt
+      (host, port)

Review Comment:
   The variables `hostStr` and `host` are redundant - `hostStr` is immediately 
assigned to `host` without any transformation. The assignment on line 44 can be 
removed and `hostStr` used directly.
   ```suggestion
         (hostStr, portStr.toInt)
   ```



##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotGrpcServerDataFetcher.scala:
##########
@@ -18,34 +18,82 @@
  */
 package org.apache.pinot.connector.spark.common.reader
 
-import io.grpc.ManagedChannelBuilder
+import io.grpc.{ManagedChannelBuilder, Metadata}
+import io.grpc.stub.MetadataUtils
 import org.apache.pinot.common.datatable.{DataTable, DataTableFactory}
 import org.apache.pinot.common.proto.PinotQueryServerGrpc
 import org.apache.pinot.common.proto.Server.ServerRequest
-import org.apache.pinot.connector.spark.common.Logging
+import org.apache.pinot.connector.spark.common.{AuthUtils, Logging, 
PinotDataSourceReadOptions}
 import org.apache.pinot.connector.spark.common.partition.PinotSplit
 import org.apache.pinot.spi.config.table.TableType
 
 import java.io.Closeable
+import java.net.URI
 import scala.collection.JavaConverters._
 
 /**
  * Data fetcher from Pinot Grpc server for specific segments.
  * Eg: offline-server1: segment1, segment2, segment3
  */
-private[reader] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
+private[reader] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit, 
readOptions: PinotDataSourceReadOptions)
   extends Logging with Closeable {
 
-  private val channel = ManagedChannelBuilder
-    .forAddress(pinotSplit.serverAndSegments.serverHost, 
pinotSplit.serverAndSegments.serverGrpcPort)
-    .usePlaintext()
-    .maxInboundMessageSize(Int.MaxValue)
-    .asInstanceOf[ManagedChannelBuilder[_]].build()
-  private val pinotServerBlockingStub = 
PinotQueryServerGrpc.newBlockingStub(channel)
+  private val (channelHost, channelPort) = {
+    if (readOptions.proxyEnabled && readOptions.grpcProxyUri.nonEmpty) {
+      val proxyUri = readOptions.grpcProxyUri.get
+      val (hostStr, portStr) = 
org.apache.pinot.connector.spark.common.NetUtils.parseHostPort(proxyUri, 
readOptions.useHttps)
+      val host = hostStr
+      val port = portStr.toInt
+      (host, port)
+    } else {
+      (pinotSplit.serverAndSegments.serverHost, 
pinotSplit.serverAndSegments.serverGrpcPort)
+    }
+  }
+
+  // Debug: Print out channel host and port
+  logDebug(s"PinotGrpcServerDataFetcher connecting to host: $channelHost, 
port: $channelPort")
+  private val baseChannelBuilder = ManagedChannelBuilder
+    .forAddress(channelHost, channelPort)
+    .maxInboundMessageSize(Math.min(readOptions.grpcMaxInboundMessageSize, 
Int.MaxValue.toLong).toInt)
+    .asInstanceOf[ManagedChannelBuilder[_]]
+
+  private val channel = {
+    val builder =
+      if (readOptions.grpcUsePlainText) baseChannelBuilder.usePlaintext()
+      else baseChannelBuilder
+    builder.build()
+  }
+
+  private val pinotServerBlockingStub = {
+    val baseStub = PinotQueryServerGrpc.newBlockingStub(channel)
+
+    // Attach proxy forwarding headers when proxy is enabled
+    val withProxyHeaders =
+      if (readOptions.proxyEnabled && readOptions.grpcProxyUri.nonEmpty) {
+        val md = new Metadata()
+        val forwardHostKey = Metadata.Key.of("forward_host", 
Metadata.ASCII_STRING_MARSHALLER)
+        val forwardPortKey = Metadata.Key.of("forward_port", 
Metadata.ASCII_STRING_MARSHALLER)
+        md.put(forwardHostKey, pinotSplit.serverAndSegments.serverHost)
+        md.put(forwardPortKey, 
pinotSplit.serverAndSegments.serverGrpcPort.toString)
+
+        // Optional: Authorization header if provided
+        AuthUtils.buildAuthHeader(readOptions.authHeader, 
readOptions.authToken) match {
+          case Some((name, value)) =>
+            val key = Metadata.Key.of(name.toLowerCase, 
Metadata.ASCII_STRING_MARSHALLER)
+            md.put(key, value)
+          case None =>
+        }
+        
baseStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(md))
+      } else baseStub
+    withProxyHeaders
+  }
 
   def fetchData(): Iterator[DataTable] = {
     val request = ServerRequest.newBuilder()
       .putMetadata("enableStreaming", "true")
+      // Also include forwarding info in request metadata for compatibility 
with proxies that inspect payload
+      .putMetadata("FORWARD_HOST", pinotSplit.serverAndSegments.serverHost)
+      .putMetadata("FORWARD_PORT", 
pinotSplit.serverAndSegments.serverGrpcPort.toString)

Review Comment:
   The forwarding metadata is duplicated in both the gRPC headers (lines 74-77) 
and the request payload metadata (lines 94-95). This duplication could lead to 
inconsistencies if one is updated but not the other. Consider extracting this 
logic into a helper method or using only one mechanism for forwarding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to