yihua commented on code in PR #10956: URL: https://github.com/apache/hudi/pull/10956#discussion_r1563399841
########## hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala: ########## @@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean, } val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseMode.toString, - enableOffHeapColumnVector && taskContext.isDefined, - capacity) + val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) { + schemaEvolutionUtils.buildVectorizedReader( + convertTz, + datetimeRebaseMode, + enableOffHeapColumnVector, + taskContext, + capacity) + } else { + new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + } Review Comment: Fold the check `schemaEvolutionUtils.shouldUseInternalSchema` into `schemaEvolutionUtils.buildVectorizedReader` (returning new VectorizedParquetRecordReader inside if internal schema is disabled) and simplify the logic here as ```suggestion val vectorizedReader = schemaEvolutionUtils.buildVectorizedReader( convertTz, datetimeRebaseMode, enableOffHeapColumnVector, taskContext, capacity) ``` ########## hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala: ########## @@ -141,8 +150,20 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean, } val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity) + val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) { Review Comment: Do we already have tests around schema evolution using the new spark file readers? ########## hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala: ########## @@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean, } val taskContext = Option(TaskContext.get()) if (enableVectorizedReader) { - val vectorizedReader = new VectorizedParquetRecordReader( - convertTz.orNull, - datetimeRebaseMode.toString, - enableOffHeapColumnVector && taskContext.isDefined, - capacity) + val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) { + schemaEvolutionUtils.buildVectorizedReader( + convertTz, + datetimeRebaseMode, + enableOffHeapColumnVector, + taskContext, + capacity) + } else { + new VectorizedParquetRecordReader( + convertTz.orNull, + datetimeRebaseMode.toString, + enableOffHeapColumnVector && taskContext.isDefined, + capacity) + } Review Comment: Same for readers for other Spark versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org