yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563399841


##########
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##########
@@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: 
Boolean,
     }
     val taskContext = Option(TaskContext.get())
     if (enableVectorizedReader) {
-      val vectorizedReader = new VectorizedParquetRecordReader(
-        convertTz.orNull,
-        datetimeRebaseMode.toString,
-        enableOffHeapColumnVector && taskContext.isDefined,
-        capacity)
+      val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) 
{
+        schemaEvolutionUtils.buildVectorizedReader(
+          convertTz,
+          datetimeRebaseMode,
+          enableOffHeapColumnVector,
+          taskContext,
+          capacity)
+      } else {
+        new VectorizedParquetRecordReader(
+          convertTz.orNull,
+          datetimeRebaseMode.toString,
+          enableOffHeapColumnVector && taskContext.isDefined,
+          capacity)
+      }

Review Comment:
   Fold the check `schemaEvolutionUtils.shouldUseInternalSchema` into 
`schemaEvolutionUtils.buildVectorizedReader` (returning new 
VectorizedParquetRecordReader inside if internal schema is disabled) and 
simplify the logic here as
   ```suggestion
         val vectorizedReader = schemaEvolutionUtils.buildVectorizedReader(
             convertTz,
             datetimeRebaseMode,
             enableOffHeapColumnVector,
             taskContext,
             capacity)
   ```



##########
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala:
##########
@@ -141,8 +150,20 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean,
     }
     val taskContext = Option(TaskContext.get())
     if (enableVectorizedReader) {
-      val vectorizedReader = new VectorizedParquetRecordReader(
-        convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, 
capacity)
+      val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) {

Review Comment:
   Do we already have tests around schema evolution using the new spark file 
readers?



##########
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##########
@@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: 
Boolean,
     }
     val taskContext = Option(TaskContext.get())
     if (enableVectorizedReader) {
-      val vectorizedReader = new VectorizedParquetRecordReader(
-        convertTz.orNull,
-        datetimeRebaseMode.toString,
-        enableOffHeapColumnVector && taskContext.isDefined,
-        capacity)
+      val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) 
{
+        schemaEvolutionUtils.buildVectorizedReader(
+          convertTz,
+          datetimeRebaseMode,
+          enableOffHeapColumnVector,
+          taskContext,
+          capacity)
+      } else {
+        new VectorizedParquetRecordReader(
+          convertTz.orNull,
+          datetimeRebaseMode.toString,
+          enableOffHeapColumnVector && taskContext.isDefined,
+          capacity)
+      }

Review Comment:
   Same for readers for other Spark versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to