[ https://issues.apache.org/jira/browse/SPARK-35252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
lynn updated SPARK-35252: ------------------------- Attachment: (was: znbase-sqlconf-isnull.png) > PartitionReaderFactory's Implemention Class of DataSourceV2 sqlConf parameter > is null > ------------------------------------------------------------------------------------- > > Key: SPARK-35252 > URL: https://issues.apache.org/jira/browse/SPARK-35252 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.2 > Reporter: lynn > Priority: Major > Attachments: spark-sqlconf-isnull.png > > > The codes of "MyPartitionReaderFactory" : > {code:scala} > // Implemention Class > package com.lynn.spark.sql.v2 > import org.apache.spark.internal.Logging > import org.apache.spark.sql.catalyst.InternalRow > import > com.lynn.spark.sql.v2.MyPartitionReaderFactory.{MY_VECTORIZED_READER_BATCH_SIZE, > MY_VECTORIZED_READER_ENABLED} > import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, > PartitionReaderFactory} > import org.apache.spark.sql.internal.SQLConf > import org.apache.spark.sql.types.StructType > import org.apache.spark.sql.vectorized.ColumnarBatch > import org.apache.spark.sql.internal.SQLConf.buildConf > case class MyPartitionReaderFactory(sqlConf: SQLConf, > dataSchema: StructType, > readSchema: StructType) > extends PartitionReaderFactory with Logging { > val enableVectorized = sqlConf.getConf(MY_VECTORIZED_READER_ENABLED, false) > val batchSize = sqlConf.getConf(MY_VECTORIZED_READER_BATCH_SIZE, 4096) > override def createReader(partition: InputPartition): > PartitionReader[InternalRow] = { > MyRowReader(batchSize, dataSchema, readSchema) > } > override def createColumnarReader(partition: InputPartition): > PartitionReader[ColumnarBatch] = { > if(!supportColumnarReads(partition)) > throw new UnsupportedOperationException("Cannot create columnar > reader.") > MyColumnReader(batchSize, dataSchema, readSchema) > } > override def supportColumnarReads(partition: InputPartition) = > enableVectorized > } > object MyPartitionReaderFactory { > val MY_VECTORIZED_READER_ENABLED = > buildConf("spark.sql.my.enableVectorizedReader") > .doc("Enables vectorized my source scan.") > .version("1.0.0") > .booleanConf > .createWithDefault(false) > val MY_VECTORIZED_READER_BATCH_SIZE = > buildConf("spark.sql.my.columnarReaderBatchSize") > .doc("The number of rows to include in a my source vectorized reader > batch. The number should " + > "be carefully chosen to minimize overhead and avoid OOMs in reading > data.") > .version("1.0.0") > .intConf > .createWithDefault(4096) > } > {code} > The driver construct a RDD instance(DataSourceRDD), the sqlConf parameter > pass to the MyPartitionReaderFactory is not null. > But when the executor deserialize the RDD, the sqlConf parameter is null. > The codes as follows: > {code:scala} > // RunTask.scala > override def runTask(context: TaskContext): U = { > // Deserialize the RDD and the func using the broadcast variables. > val threadMXBean = ManagementFactory.getThreadMXBean > val deserializeStartTimeNs = System.nanoTime() > val deserializeStartCpuTime = if > (threadMXBean.isCurrentThreadCpuTimeSupported) { > threadMXBean.getCurrentThreadCpuTime > } else 0L > val ser = SparkEnv.get.closureSerializer.newInstance() > val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => > U)]( > ByteBuffer.wrap(taskBinary.value), > Thread.currentThread.getContextClassLoader) > _executorDeserializeTimeNs = System.nanoTime() - deserializeStartTimeNs > _executorDeserializeCpuTime = if > (threadMXBean.isCurrentThreadCpuTimeSupported) { > threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime > } else 0L > func(context, rdd.iterator(partition, context)) > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org