yihua commented on code in PR #6352:
URL: https://github.com/apache/hudi/pull/6352#discussion_r952255031
##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
+
+import scala.reflect.ClassTag
+
+/**
+ * NOTE: This is a generalized version of of Spark's [[SQLExecutionRDD]]
+ *
+ * It is just a wrapper over [[sqlRDD]] which sets and makes effective all the
configs from the
+ * captured [[SQLConf]]
+ *
+ * @param sqlRDD the `RDD` generated by the SQL plan
+ * @param conf the `SQLConf` to apply to the execution of the SQL plan
+ */
+class SQLConfInjectingRDD[T: ClassTag](var sqlRDD: RDD[T], @transient conf:
SQLConf) extends RDD[T](sqlRDD) {
+ private val sqlConfigs = conf.getAllConfs
+ private lazy val sqlConfExecutorSide = {
+val newConf = new SQLConf()
+sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) }
+newConf
+ }
+
+ override val partitioner = firstParent[InternalRow].partitioner
+
+ override def getPartitions: Array[Partition] =
firstParent[InternalRow].partitions
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+// If we are in the context of a tracked SQL operation,
`SQLExecution.EXECUTION_ID_KEY` is set
+// and we have nothing to do here. Otherwise, we use the `SQLConf`
captured at the creation of
+// this RDD.
+if (context.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) == null) {
+ SQLConf.withExistingConf(sqlConfExecutorSide) {
Review Comment:
Is there any performance hit by doing this? Based on the function
signature, this is done per Spark partition.
##
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/SQLConfInjectingRDD.scala:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.internal.SQLConf
+
+import scala.reflect.ClassTag
+
+/**
+ * NOTE: This is a generalized version of of Spark's [[SQLExecutionRDD]]
+ *
+ * It is just a wrapper over [[sqlRDD]] which sets and makes effective all the
configs from the
+ * captured [[SQLConf]]
+ *
+ * @param sqlRDD the `RDD` generated by the SQL plan
+ * @param conf the `SQLConf` to apply to the execution of the SQL plan
+ */
+class SQLConfInjectingRDD[T: ClassTag](var sqlRDD: RDD[T], @transient conf:
SQLConf) extends RDD[T](sqlRDD) {
+ private val sqlConfigs = conf.getAllConfs
+ private lazy val sqlConfExecutorSide = {
+val newConf = new SQLConf()
+sqlConfigs.foreach { case (k, v) => newConf.setConfString(k, v) }
+newConf
+ }
+
+ override val partitioner = firstParent[InternalRow].partitioner
+
+ override def getPartitions: Array[Partition] =
firstParent[InternalRow].partitions
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+// If we are in the context of a tracked SQL operation,
`SQLExecution.EXECUTION_ID_KEY` is set
+// and we have nothing to do here.