Copilot commented on code in PR #16666:
URL: https://github.com/apache/pinot/pull/16666#discussion_r2322780987
##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala:
##########
@@ -98,7 +153,35 @@ private[reader] class PinotServerDataFetcher(
val instanceConfig = new InstanceConfig(nullZkId)
instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
- // TODO: support netty-sec
+
+ // Configure TLS for server instance if HTTPS is enabled
+ if (dataSourceOptions.useHttps) {
+ instanceConfig.getRecord.setSimpleField("TLS_PORT",
pinotSplit.serverAndSegments.serverPort)
+ }
+
+ // Configure gRPC port
+ instanceConfig.getRecord.setSimpleField("GRPC_PORT",
dataSourceOptions.grpcPort.toString)
+
+ // Configure proxy forwarding if enabled
+ if (dataSourceOptions.proxyEnabled &&
dataSourceOptions.grpcProxyUri.isDefined) {
+ // When using proxy, the server instance should point to the proxy
+ val proxyUri = dataSourceOptions.grpcProxyUri.get
+ val (proxyHost, proxyPort) = proxyUri.split(":", 2) match {
+ case Array(h, p) if p.forall(_.isDigit) => (h, p)
+ case Array(h) =>
+ val defaultPort = if (dataSourceOptions.useHttps) "443" else "80"
+ (h, defaultPort)
+ case _ => throw new IllegalArgumentException(s"Proxy URI '$proxyUri'
must be in the format host:port")
+ }
Review Comment:
This proxy URI parsing logic is duplicated in PinotGrpcServerDataFetcher
(lines 44-52). Consider extracting this into a shared utility method in a
common class to avoid code duplication and ensure consistent behavior across
both implementations.
##########
pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/DataExtractor.scala:
##########
@@ -116,6 +118,21 @@ private[pinot] object DataExtractor {
dataTable.getFloat(rowIndex, colIndex)
case ColumnDataType.DOUBLE =>
dataTable.getDouble(rowIndex, colIndex)
+ case ColumnDataType.BIG_DECIMAL =>
+ val bd = dataTable.getBigDecimal(rowIndex, colIndex)
+ if (bd == null) null
+ else {
+ // Derive precision/scale from actual value; clamp to Spark's max
precision
+ val precision = math.min(DecimalType.MAX_PRECISION, bd.precision())
+ val scale = math.min(DecimalType.MAX_SCALE, math.max(0, bd.scale()))
+ val scaled =
+ if (bd.scale() == scale) bd
+ else bd.setScale(scale, java.math.RoundingMode.HALF_UP)
Review Comment:
The BIG_DECIMAL handling logic dynamically derives precision and scale from
the value, but uses hardcoded DecimalType(38, 18) in the schema conversion.
This inconsistency could lead to runtime errors if actual values exceed the
schema definition. Consider using consistent precision/scale values or
documenting this design choice.
##########
pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptions.scala:
##########
@@ -74,9 +109,46 @@ object PinotDataSourceReadOptions {
// pinot cluster options
val controller = options.getOrDefault(CONFIG_CONTROLLER,
DEFAULT_CONTROLLER)
+ // Unified security mode: if provided, it controls defaults for both HTTPS
and gRPC TLS
+ val secureModeDefined = options.containsKey(CONFIG_SECURE_MODE)
+ val secureModeValue = if (secureModeDefined)
options.getBoolean(CONFIG_SECURE_MODE, DEFAULT_SECURE_MODE) else
DEFAULT_SECURE_MODE
+ // Parse HTTPS configuration early so it can be used for broker discovery.
Precedence: explicit useHttps, else secureMode, else default
+ val useHttps = if (options.containsKey(CONFIG_USE_HTTPS))
options.getBoolean(CONFIG_USE_HTTPS, DEFAULT_USE_HTTPS) else secureModeValue
+ val keystorePath =
Option(options.get(CONFIG_KEYSTORE_PATH)).filter(_.nonEmpty)
+ val keystorePassword =
Option(options.get(CONFIG_KEYSTORE_PASSWORD)).filter(_.nonEmpty)
+ val truststorePath =
Option(options.get(CONFIG_TRUSTSTORE_PATH)).filter(_.nonEmpty)
+ val truststorePassword =
Option(options.get(CONFIG_TRUSTSTORE_PASSWORD)).filter(_.nonEmpty)
+ val authHeader = Option(options.get(CONFIG_AUTH_HEADER)).filter(_.nonEmpty)
+ val authToken = Option(options.get(CONFIG_AUTH_TOKEN)).filter(_.nonEmpty)
+
+ // Parse proxy configuration
+ val proxyEnabled = options.getBoolean(CONFIG_PROXY_ENABLED,
DEFAULT_PROXY_ENABLED)
+
+ // Parse gRPC configuration
+ val grpcPort = options.getInt(CONFIG_GRPC_PORT, DEFAULT_GRPC_PORT)
+ val grpcMaxInboundMessageSize =
options.getLong(CONFIG_GRPC_MAX_INBOUND_MESSAGE_SIZE,
DEFAULT_GRPC_MAX_INBOUND_MESSAGE_SIZE)
+ // gRPC plain-text: explicit flag wins; otherwise derive from secureMode
(true => TLS => plain-text=false)
+ val grpcUsePlainText = if (options.containsKey(CONFIG_GRPC_USE_PLAIN_TEXT))
+ options.getBoolean(CONFIG_GRPC_USE_PLAIN_TEXT,
DEFAULT_GRPC_USE_PLAIN_TEXT)
+ else
+ !secureModeValue
Review Comment:
The logic for deriving gRPC plain text setting from secureMode is inverted
and could be confusing. Consider adding a comment explaining that when
secureMode is true, we want TLS (hence plain text = false), or refactor to make
the logic more explicit: `if secureMode then false else true`.
```suggestion
// gRPC plain-text: explicit flag wins; otherwise, if secureMode is
true, we want TLS (plain-text = false); if secureMode is false, we want
plain-text = true.
val grpcUsePlainText = if
(options.containsKey(CONFIG_GRPC_USE_PLAIN_TEXT)) {
options.getBoolean(CONFIG_GRPC_USE_PLAIN_TEXT,
DEFAULT_GRPC_USE_PLAIN_TEXT)
} else {
if (secureModeValue) false else true
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]