Copilot commented on code in PR #12367:
URL: https://github.com/apache/gluten/pull/12367#discussion_r3481993381
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -199,7 +228,16 @@ class VeloxIteratorApi extends IteratorApi with Logging {
iter => new ColumnarBatchInIterator(BackendsApiManager.getBackendName,
iter.asJava)
}
- val extraConf = Map(GlutenConfig.COLUMNAR_CUDF_ENABLED.key ->
enableCudf.toString).asJava
+ // Merge the fs.* keys captured on the driver (stored in
GlutenPartition.fsConf)
+ // into the extraConf passed to NativePlanEvaluator / VeloxRuntime.
+ // Runtimes.contextInstance() will call
GlutenConfig.getNativeSessionConf() which
+ // merges extraConf on top of SQLConf.get.getAllConfs. Because the
executor-side
+ // SQLConf never receives "fs.*" keys (Spark only propagates "spark.*"
keys via
+ // task local properties), this is the only path these credentials can
take to
+ // reach the native session config and ultimately the Velox ABFS connector.
+ val partitionFsConf = inputPartition.asInstanceOf[GlutenPartition].fsConf
+ val extraConf = (partitionFsConf +
+ (GlutenConfig.COLUMNAR_CUDF_ENABLED.key -> enableCudf.toString)).asJava
val transKernel =
NativePlanEvaluator.create(BackendsApiManager.getBackendName, extraConf)
Review Comment:
`extraConf` is passed into `NativePlanEvaluator.create`, which calls
`Runtimes.contextInstance`. Today `Runtimes.contextInstance` uses
`s"$backendName:$name:$extraConf"` as the TaskResources key
(gluten-arrow/src/main/scala/org/apache/gluten/runtime/Runtimes.scala:25-35).
With this change, `extraConf` can contain fs credential values (e.g.
`fs.s3a.secret.key`), so those secrets may be retained in heap and may be
exposed via debugging/logging of resource keys; additionally, map `toString`
order can be nondeterministic, causing unnecessary distinct runtime
registrations. Consider changing the runtime keying strategy to use a stable
hash and/or redact sensitive values (or avoid including values entirely).
##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
// Only serialize plan once, save lots time when plan is complex.
val planByteArray = wsCtx.root.toProtobuf.toByteArray
+ // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+ // Hadoop configuration NOW, while we are still on the driver, and embed
+ // them in every GlutenPartition. These keys are set by the user via
+ // spark.conf.set("fs.azure.account.auth.type", ...) or
+ // sparkContext.hadoopConfiguration.set(...)
+ // Spark's withSQLConfPropagated only forwards keys starting with "spark"
+ // as task-local-properties, so "fs.*" keys never reach the executor's
+ // SQLConf. Serialising them inside the partition is the only safe way
+ // to make them available to the native runtime on the executor.
+ // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver.
+ // SparkPlan.sqlContext is available on the driver -- using the first leaf
+ // gives us access to sessionState.newHadoopConf() which includes all keys
+ // set via spark.conf.set(), sparkContext.hadoopConfiguration, and
+ // DataFrameReader.option(). These are NOT propagated to executors by
+ // Spark's withSQLConfPropagated (it only forwards keys starting with
+ // "spark"), so embedding them in the serialised GlutenPartition is the
+ // only reliable transport mechanism.
+ val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.")
+ val hadoopConf = leaves.headOption
+ .map(_ =>
org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf())
+
.getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration)
Review Comment:
`SparkSession.active` may throw if there is no active session bound to the
current thread, and it can also pick up the wrong session when multiple
SparkSessions are in play. Since `leaves` are SparkPlans created under the
correct SQLContext, prefer using
`leaves.head.sqlContext.sessionState.newHadoopConf()` rather than the global
active-session lookup.
##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition,
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*,
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+ private val api = new VeloxIteratorApi
+
+ /**
+ * Build a minimal WholeStageTransformContext backed by an empty Substrait
plan. genPartitions
+ * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations
is sufficient for the
+ * purpose of this test.
+ */
+ private def emptyWsCtx: WholeStageTransformContext =
+ WholeStageTransformContext(PlanBuilder.empty())
+
+ /**
+ * Set Hadoop conf keys on the underlying mutable configuration and restore
their previous values
+ * (or unset them) after the block. `sessionState.newHadoopConf()` copies
from
+ * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+ */
+ private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+ // scalastyle:off hadoopconfiguration
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ // scalastyle:on hadoopconfiguration
+ val prev: Seq[(String, Option[String])] = pairs.map {
+ case (k, _) => k -> Option(hadoopConf.get(k))
+ }
+ pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+ try body
+ finally prev.foreach {
+ case (k, Some(old)) => hadoopConf.set(k, old)
+ case (k, None) => hadoopConf.unset(k)
+ }
+ }
+
+ test("genPartitions embeds fs.azure.* keys from Hadoop conf into
GlutenPartition.fsConf") {
+ withHadoopConf(
+ "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+ "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+ ) {
+ val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+ assert(partitions.size == 1)
+ val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+ assert(
+
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+ s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+
assert(fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") ==
"OAuth")
+ assert(
+ fsConf.contains("fs.azure.account.oauth.provider.type"),
+ s"Expected fs.azure key not found; got: ${fsConf.keys.mkString(", ")}")
+ assert(fsConf("fs.azure.account.oauth.provider.type") ==
"ClientCredentials")
+ }
+ }
+
+ test("genPartitions embeds fs.s3a.* keys from Hadoop conf into
GlutenPartition.fsConf") {
+ withHadoopConf(
+ "fs.s3a.access.key" -> "AKIAIOSFODNN7EXAMPLE",
+ "fs.s3a.secret.key" -> "wJalrXUtnFEMI"
+ ) {
+ val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+ assert(partitions.size == 1)
+ val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+ assert(
+ fsConf.contains("fs.s3a.access.key"),
+ s"Expected fs.s3a.access.key not found; got: ${fsConf.keys.mkString(",
")}")
+ assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+ assert(fsConf.contains("fs.s3a.secret.key"))
+ assert(fsConf("fs.s3a.secret.key") == "wJalrXUtnFEMI")
+ }
+ }
Review Comment:
This test uses AWS-looking access key / secret key strings. Even though they
are dummy values, patterns like `AKIA...` are commonly flagged by secret
scanners and can block merges. Prefer clearly fake placeholders that do not
match real credential formats.
##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition,
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*,
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach
"fs.*" keys.
+ *
+ * Keys must be set on `sparkContext.hadoopConfiguration` (the mutable base
configuration) because
+ * `sessionState.newHadoopConf()` creates a fresh copy each time - mutations
to its return value are
+ * discarded before the next call.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+ private val api = new VeloxIteratorApi
+
+ /**
+ * Build a minimal WholeStageTransformContext backed by an empty Substrait
plan. genPartitions
+ * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations
is sufficient for the
+ * purpose of this test.
+ */
+ private def emptyWsCtx: WholeStageTransformContext =
+ WholeStageTransformContext(PlanBuilder.empty())
+
+ /**
+ * Set Hadoop conf keys on the underlying mutable configuration and restore
their previous values
+ * (or unset them) after the block. `sessionState.newHadoopConf()` copies
from
+ * `sparkContext.hadoopConfiguration`, so this is the correct mutation point.
+ */
+ private def withHadoopConf(pairs: (String, String)*)(body: => Unit): Unit = {
+ // scalastyle:off hadoopconfiguration
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ // scalastyle:on hadoopconfiguration
+ val prev: Seq[(String, Option[String])] = pairs.map {
+ case (k, _) => k -> Option(hadoopConf.get(k))
+ }
+ pairs.foreach { case (k, v) => hadoopConf.set(k, v) }
+ try body
+ finally prev.foreach {
+ case (k, Some(old)) => hadoopConf.set(k, old)
+ case (k, None) => hadoopConf.unset(k)
+ }
+ }
+
+ test("genPartitions embeds fs.azure.* keys from Hadoop conf into
GlutenPartition.fsConf") {
+ withHadoopConf(
+ "fs.azure.account.auth.type.myaccount.dfs.core.windows.net" -> "OAuth",
+ "fs.azure.account.oauth.provider.type" -> "ClientCredentials"
+ ) {
+ val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
Review Comment:
Issue #10113 and the new code comments mention configs set via
`spark.conf.set("fs.*", ...)` / SQLConf, but this suite only mutates
`sparkContext.hadoopConfiguration`. That means the tests don’t currently
validate the primary regression scenario (session-level `spark.conf.set` keys
making it into `GlutenPartition.fsConf`). Consider adding at least one test
that sets an `fs.*` key via `spark.conf.set` and asserts it is captured.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]