Copilot commented on code in PR #12367:
URL: https://github.com/apache/gluten/pull/12367#discussion_r3475543313


##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()

Review Comment:
   This test mutates `spark.sessionState.newHadoopConf()`, but 
`newHadoopConf()` returns a new `Configuration` copy; 
`VeloxIteratorApi.genPartitions` will not see mutations made on that temporary 
object. Set keys on `spark.sparkContext.hadoopConfiguration` (or via the same 
mechanism the production code reads) so the test actually validates propagation.



##########
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala:
##########
@@ -125,12 +125,41 @@ class VeloxIteratorApi extends IteratorApi with Logging {
     // Only serialize plan once, save lots time when plan is complex.
     val planByteArray = wsCtx.root.toProtobuf.toByteArray
 
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys from the driver-side
+    // Hadoop configuration NOW, while we are still on the driver, and embed
+    // them in every GlutenPartition.  These keys are set by the user via
+    //   spark.conf.set("fs.azure.account.auth.type", ...)   or
+    //   sparkContext.hadoopConfiguration.set(...)
+    // Spark's withSQLConfPropagated only forwards keys starting with "spark"
+    // as task-local-properties, so "fs.*" keys never reach the executor's
+    // SQLConf.  Serialising them inside the partition is the only safe way
+    // to make them available to the native runtime on the executor.
+    // Capture fs.azure.* / fs.s3a.* / fs.gs.* keys while on the driver.
+    // SparkPlan.sqlContext is available on the driver -- using the first leaf
+    // gives us access to sessionState.newHadoopConf() which includes all keys
+    // set via spark.conf.set(), sparkContext.hadoopConfiguration, and
+    // DataFrameReader.option().  These are NOT propagated to executors by
+    // Spark's withSQLConfPropagated (it only forwards keys starting with
+    // "spark"), so embedding them in the serialised GlutenPartition is the
+    // only reliable transport mechanism.
+    val fsPrefixes = Seq("fs.azure.", "fs.s3a.", "fs.gs.")
+    val hadoopConf = leaves.headOption
+      .map(_ => 
org.apache.spark.sql.SparkSession.active.sessionState.newHadoopConf())
+      
.getOrElse(org.apache.spark.SparkContext.getOrCreate().hadoopConfiguration)

Review Comment:
   `genPartitions` currently uses `SparkSession.active` to build the Hadoop 
конф and ignores the `leaves` session entirely (`map(_ => ...)`). This can 
capture the wrong session (or fail if no active session is set on the thread). 
Prefer using the first leaf’s `sqlContext.sparkSession` when available, and 
fall back to the active session / SparkContext only when needed.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")

Review Comment:
   The test data uses AWS-looking access/secret key strings (e.g., `AKIA...`) 
which can trigger secret scanners and is easy to misinterpret as a real 
credential. Use obviously fake non-matching placeholders instead.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()

Review Comment:
   This test mutates `spark.sessionState.newHadoopConf()`, but 
`newHadoopConf()` returns a new `Configuration` copy; 
`VeloxIteratorApi.genPartitions` will not see mutations made on that temporary 
object. Set keys on `spark.sparkContext.hadoopConfiguration` (or via the same 
mechanism the production code reads) so the test actually validates propagation.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not 
found; got: $fsConf")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("fs.s3a.secret.key")
+    }
+  }
+
+  test("genPartitions embeds fs.gs.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()

Review Comment:
   This test mutates `spark.sessionState.newHadoopConf()`, but 
`newHadoopConf()` returns a new `Configuration` copy; 
`VeloxIteratorApi.genPartitions` will not see mutations made on that temporary 
object. Set keys on `spark.sparkContext.hadoopConfiguration` (or via the same 
mechanism the production code reads) so the test actually validates propagation.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not 
found; got: $fsConf")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("fs.s3a.secret.key")
+    }
+  }
+
+  test("genPartitions embeds fs.gs.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        fsConf.contains("fs.gs.auth.service.account.json.keyfile"),
+        s"Expected fs.gs key not found; got: $fsConf")
+      assert(fsConf("fs.gs.auth.service.account.json.keyfile") == 
"/tmp/sa.json")
+    } finally {
+      hadoopConf.unset("fs.gs.auth.service.account.json.keyfile")
+    }
+  }
+
+  test("genPartitions does not include non-fs.* keys in 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()

Review Comment:
   This test mutates `spark.sessionState.newHadoopConf()`, but 
`newHadoopConf()` returns a new `Configuration` copy; 
`VeloxIteratorApi.genPartitions` will not see mutations made on that temporary 
object. Set keys on `spark.sparkContext.hadoopConfiguration` (or via the same 
mechanism the production code reads) so the test actually validates propagation.



##########
backends-velox/src/test/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApiFsConfSuite.scala:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi.velox
+
+import org.apache.gluten.execution.{GlutenPartition, 
WholeStageTransformContext}
+import org.apache.gluten.substrait.plan.PlanBuilder
+
+import org.apache.spark.sql.test.SharedSparkSession
+
+/**
+ * Tests that [[VeloxIteratorApi.genPartitions]] captures fs.azure.*, 
fs.s3a.*, and fs.gs.* keys
+ * from the driver-side Hadoop configuration and embeds them in 
[[GlutenPartition.fsConf]], so they
+ * are available on executors where Spark's SQLConf propagation does not reach 
"fs.*" keys.
+ */
+class VeloxIteratorApiFsConfSuite extends SharedSparkSession {
+
+  private val api = new VeloxIteratorApi
+
+  /**
+   * Build a minimal WholeStageTransformContext backed by an empty Substrait 
plan. genPartitions
+   * only calls wsCtx.root.toProtobuf.toByteArray, so a plan with no relations 
is sufficient for the
+   * purpose of this test.
+   */
+  private def emptyWsCtx: WholeStageTransformContext =
+    WholeStageTransformContext(PlanBuilder.empty())
+
+  test("genPartitions embeds fs.azure.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    
hadoopConf.set("fs.azure.account.auth.type.myaccount.dfs.core.windows.net", 
"OAuth")
+    hadoopConf.set("fs.azure.account.oauth.provider.type", "ClientCredentials")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        
fsConf.contains("fs.azure.account.auth.type.myaccount.dfs.core.windows.net"),
+        s"Expected fs.azure key not found; got: $fsConf")
+      assert(
+        fsConf("fs.azure.account.auth.type.myaccount.dfs.core.windows.net") == 
"OAuth")
+      assert(
+        fsConf.contains("fs.azure.account.oauth.provider.type"),
+        s"Expected fs.azure key not found; got: $fsConf")
+    } finally {
+      
hadoopConf.unset("fs.azure.account.auth.type.myaccount.dfs.core.windows.net")
+      hadoopConf.unset("fs.azure.account.oauth.provider.type")
+    }
+  }
+
+  test("genPartitions embeds fs.s3a.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE")
+    hadoopConf.set("fs.s3a.secret.key", "wJalrXUtnFEMI")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(fsConf.contains("fs.s3a.access.key"), s"Expected fs.s3a key not 
found; got: $fsConf")
+      assert(fsConf("fs.s3a.access.key") == "AKIAIOSFODNN7EXAMPLE")
+      assert(fsConf.contains("fs.s3a.secret.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("fs.s3a.secret.key")
+    }
+  }
+
+  test("genPartitions embeds fs.gs.* keys from Hadoop conf into 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.gs.auth.service.account.json.keyfile", "/tmp/sa.json")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      assert(partitions.size == 1)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(
+        fsConf.contains("fs.gs.auth.service.account.json.keyfile"),
+        s"Expected fs.gs key not found; got: $fsConf")
+      assert(fsConf("fs.gs.auth.service.account.json.keyfile") == 
"/tmp/sa.json")
+    } finally {
+      hadoopConf.unset("fs.gs.auth.service.account.json.keyfile")
+    }
+  }
+
+  test("genPartitions does not include non-fs.* keys in 
GlutenPartition.fsConf") {
+    val hadoopConf = spark.sessionState.newHadoopConf()
+    hadoopConf.set("fs.s3a.access.key", "KEY")
+    hadoopConf.set("spark.some.conf", "value")
+    hadoopConf.set("mapreduce.input.fileinputformat.split.maxsize", 
"128000000")
+    try {
+      val partitions = api.genPartitions(emptyWsCtx, Seq(Seq.empty), Seq.empty)
+      val fsConf = partitions.head.asInstanceOf[GlutenPartition].fsConf
+      assert(!fsConf.contains("spark.some.conf"), "Non-fs key must not appear 
in fsConf")
+      assert(
+        !fsConf.contains("mapreduce.input.fileinputformat.split.maxsize"),
+        "Non-fs key must not appear in fsConf")
+      assert(fsConf.contains("fs.s3a.access.key"))
+    } finally {
+      hadoopConf.unset("fs.s3a.access.key")
+      hadoopConf.unset("spark.some.conf")
+      hadoopConf.unset("mapreduce.input.fileinputformat.split.maxsize")
+    }
+  }
+
+  test("genPartitions produces empty fsConf when no fs.* keys are set") {
+    // Use a key guaranteed not to exist in the Hadoop conf under any test 
profile.
+    val hadoopConf = spark.sessionState.newHadoopConf()

Review Comment:
   This test mutates `spark.sessionState.newHadoopConf()`, but 
`newHadoopConf()` returns a new `Configuration` copy; 
`VeloxIteratorApi.genPartitions` will not see mutations made on that temporary 
object. Set keys on `spark.sparkContext.hadoopConfiguration` (or via the same 
mechanism the production code reads) so the test actually validates propagation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to