Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex merged PR #10956:
URL: https://github.com/apache/hudi/pull/10956


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057767464

   https://github.com/apache/hudi/assets/26940621/fba674a1-82fc-4b21-ab90-40623835d9f0";>
   azure ci passing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563556374


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,

Review Comment:
   Thoughts for follow-ups in separate PRs.  I see the schema evolution related 
logic is invoked per reader/file, but I think part of the logic is based on the 
information at the table level, e.g., the internal schema of the table.  Is it 
possible to pass in such info to the reader instead of deriving them per file 
group?



##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057613284

   
   ## CI report:
   
   * 8943bb4eaf741096203bed688905977d4bf59160 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23265)
 
   * 4c3242159414786c927f13b83013b045c517ff65 UNKNOWN
   * eb58a1a3af3bda46cd0db910ac39e37efd744cdd Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23267)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057600621

   
   ## CI report:
   
   * 8943bb4eaf741096203bed688905977d4bf59160 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23265)
 
   * 4c3242159414786c927f13b83013b045c517ff65 UNKNOWN
   * eb58a1a3af3bda46cd0db910ac39e37efd744cdd UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057588207

   
   ## CI report:
   
   * 8943bb4eaf741096203bed688905977d4bf59160 Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23265)
 
   * 4c3242159414786c927f13b83013b045c517ff65 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1566258283


##
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala:
##
@@ -156,30 +177,51 @@ class Spark24ParquetReader(enableVectorizedReader: 
Boolean,
   iter.asInstanceOf[Iterator[InternalRow]]
 } else {
   // ParquetRecordReader returns UnsafeRow
+  val readSupport = new ParquetReadSupport(convertTz)
   val reader = if (pushed.isDefined && enableRecordFilter) {
 val parquetFilter = FilterCompat.get(pushed.get, null)
-new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), 
parquetFilter)
+new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
   } else {
-new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+new ParquetRecordReader[UnsafeRow](readSupport)
   }
   val iter = new RecordReaderIterator(reader)
   // SPARK-23457 Register a task completion lister before `initialization`.
   taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
   reader.initialize(split, hadoopAttemptContext)
 
   val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-  val joinedRow = new JoinedRow()
-  val appendPartitionColumns = 
GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+  val unsafeProjection = if (implicitTypeChangeInfos.isEmpty) {
+GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+  } else {
+val newFullSchema = new 
StructType(requiredSchema.fields.zipWithIndex.map { case (f, i) =>
+  if (implicitTypeChangeInfos.containsKey(i)) {
+StructField(f.name, implicitTypeChangeInfos.get(i).getRight, 
f.nullable, f.metadata)
+  } else f
+}).toAttributes ++ partitionSchema.toAttributes
+val castSchema = newFullSchema.zipWithIndex.map { case (attr, i) =>
+  if (implicitTypeChangeInfos.containsKey(i)) {
+val srcType = implicitTypeChangeInfos.get(i).getRight
+val dstType = implicitTypeChangeInfos.get(i).getLeft
+val needTimeZone = Cast.needsTimeZone(srcType, dstType)
+Cast(attr, dstType, if (needTimeZone) timeZoneId else None)
+  } else attr
+}
+GenerateUnsafeProjection.generate(castSchema, newFullSchema)
+  }

Review Comment:
   Spark 2 schema evolution diverges, so we just do what's in the legacy 
format. We can follow up and discuss with the original authors of schema on read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1566257427


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan 

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1566256863


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan 

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1566256714


##
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##
@@ -174,7 +190,7 @@ class Spark30ParquetReader(enableVectorizedReader: Boolean,
   reader.initialize(split, hadoopAttemptContext)
 
   val fullSchema = requiredSchema.toAttributes ++ 
partitionSchema.toAttributes
-  val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, 
fullSchema)
+  val unsafeProjection = 
schemaEvolutionUtils.generateUnsafeProjection(fullSchema, timeZoneId)

Review Comment:
   No longer needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057514363

   
   ## CI report:
   
   * be7795021e2cffe600a109448ed02e5860385b9f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23262)
 
   * 8943bb4eaf741096203bed688905977d4bf59160 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23265)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057501959

   
   ## CI report:
   
   * be7795021e2cffe600a109448ed02e5860385b9f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23262)
 
   * 8943bb4eaf741096203bed688905977d4bf59160 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057374664

   
   ## CI report:
   
   * be7795021e2cffe600a109448ed02e5860385b9f Azure: 
[FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23262)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2057219823

   
   ## CI report:
   
   * c8f507bcac03c7183893400487a1885400c46853 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230)
 
   * be7795021e2cffe600a109448ed02e5860385b9f UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1565961863


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan 

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-15 Thread via GitHub


jonvex commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1565959794


##
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala:
##
@@ -141,8 +150,20 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, 
capacity)
+  val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) {

Review Comment:
   The next pr adds the usage of the readers to the fg reader and removes the 
disable fg reader from the schema evolution tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563527600


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan =

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563514428


##
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/execution/datasources/Spark3ParquetSchemaEvolutionUtils.scala:
##
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.utils.SparkInternalSchemaConverter
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.util
+import org.apache.hudi.common.util.InternalSchemaCache
+import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.internal.schema.InternalSchema
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger
+import org.apache.hudi.internal.schema.utils.{InternalSchemaUtils, SerDeHelper}
+import org.apache.parquet.hadoop.metadata.FileMetaData
+import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
UnsafeProjection}
+import 
org.apache.spark.sql.execution.datasources.Spark3ParquetSchemaEvolutionUtils.pruneInternalSchema
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormatHelper,
 ParquetReadSupport}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.{AtomicType, DataType, StructField, 
StructType}
+
+import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
+
+abstract class Spark3ParquetSchemaEvolutionUtils(sharedConf: Configuration,
+ filePath: Path,
+ requiredSchema: StructType,
+ partitionSchema: StructType) {
+  // Fetch internal schema
+  private lazy val internalSchemaStr: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA)
+
+  private lazy val querySchemaOption: util.Option[InternalSchema] = 
pruneInternalSchema(internalSchemaStr, requiredSchema)
+
+  var shouldUseInternalSchema: Boolean = !isNullOrEmpty(internalSchemaStr) && 
querySchemaOption.isPresent
+
+  private lazy val tablePath: String = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_TABLE_PATH)
+  private lazy val fileSchema: InternalSchema = if (shouldUseInternalSchema) {
+val commitInstantTime = FSUtils.getCommitTime(filePath.getName).toLong;
+val validCommits = 
sharedConf.get(SparkInternalSchemaConverter.HOODIE_VALID_COMMITS_LIST)
+InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, 
tablePath, sharedConf, if (validCommits == null) "" else validCommits)
+  } else {
+null
+  }
+
+  def rebuildFilterFromParquet(filter: Filter): Filter = {
+rebuildFilterFromParquetHelper(filter, fileSchema, 
querySchemaOption.orElse(null))
+  }
+
+  private def rebuildFilterFromParquetHelper(oldFilter: Filter, fileSchema: 
InternalSchema, querySchema: InternalSchema): Filter = {
+if (fileSchema == null || querySchema == null) {
+  oldFilter
+} else {
+  oldFilter match {
+case eq: EqualTo =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eq.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eq.copy(attribute = 
newAttribute)
+case eqs: EqualNullSafe =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(eqs.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else eqs.copy(attribute = 
newAttribute)
+case gt: GreaterThan =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gt.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gt.copy(attribute = 
newAttribute)
+case gtr: GreaterThanOrEqual =>
+  val newAttribute = 
InternalSchemaUtils.reBuildFilterName(gtr.attribute, fileSchema, querySchema)
+  if (newAttribute.isEmpty) AlwaysTrue else gtr.copy(attribute = 
newAttribute)
+case lt: LessThan =

Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


yihua commented on code in PR #10956:
URL: https://github.com/apache/hudi/pull/10956#discussion_r1563399841


##
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##
@@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: 
Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull,
-datetimeRebaseMode.toString,
-enableOffHeapColumnVector && taskContext.isDefined,
-capacity)
+  val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) 
{
+schemaEvolutionUtils.buildVectorizedReader(
+  convertTz,
+  datetimeRebaseMode,
+  enableOffHeapColumnVector,
+  taskContext,
+  capacity)
+  } else {
+new VectorizedParquetRecordReader(
+  convertTz.orNull,
+  datetimeRebaseMode.toString,
+  enableOffHeapColumnVector && taskContext.isDefined,
+  capacity)
+  }

Review Comment:
   Fold the check `schemaEvolutionUtils.shouldUseInternalSchema` into 
`schemaEvolutionUtils.buildVectorizedReader` (returning new 
VectorizedParquetRecordReader inside if internal schema is disabled) and 
simplify the logic here as
   ```suggestion
 val vectorizedReader = schemaEvolutionUtils.buildVectorizedReader(
 convertTz,
 datetimeRebaseMode,
 enableOffHeapColumnVector,
 taskContext,
 capacity)
   ```



##
hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark24ParquetReader.scala:
##
@@ -141,8 +150,20 @@ class Spark24ParquetReader(enableVectorizedReader: Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, 
capacity)
+  val vectorizedReader = if (!implicitTypeChangeInfos.isEmpty) {

Review Comment:
   Do we already have tests around schema evolution using the new spark file 
readers?



##
hudi-spark-datasource/hudi-spark3.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark30ParquetReader.scala:
##
@@ -142,11 +149,20 @@ class Spark30ParquetReader(enableVectorizedReader: 
Boolean,
 }
 val taskContext = Option(TaskContext.get())
 if (enableVectorizedReader) {
-  val vectorizedReader = new VectorizedParquetRecordReader(
-convertTz.orNull,
-datetimeRebaseMode.toString,
-enableOffHeapColumnVector && taskContext.isDefined,
-capacity)
+  val vectorizedReader = if (schemaEvolutionUtils.shouldUseInternalSchema) 
{
+schemaEvolutionUtils.buildVectorizedReader(
+  convertTz,
+  datetimeRebaseMode,
+  enableOffHeapColumnVector,
+  taskContext,
+  capacity)
+  } else {
+new VectorizedParquetRecordReader(
+  convertTz.orNull,
+  datetimeRebaseMode.toString,
+  enableOffHeapColumnVector && taskContext.isDefined,
+  capacity)
+  }

Review Comment:
   Same for readers for other Spark versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052706711

   
   ## CI report:
   
   * c8f507bcac03c7183893400487a1885400c46853 Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052654190

   
   ## CI report:
   
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   * c8f507bcac03c7183893400487a1885400c46853 Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23230)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052647727

   
   ## CI report:
   
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   * c8f507bcac03c7183893400487a1885400c46853 UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052596422

   
   ## CI report:
   
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052437070

   
   ## CI report:
   
   * 37bc97b3e080cb3664405a446c0174655720d41c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221)
 
   * a73f9559fc8626342b767085cf7a56f743a425fc Azure: 
[PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23227)
 
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-7566] Add schema evolution to spark file readers [hudi]

2024-04-12 Thread via GitHub


hudi-bot commented on PR #10956:
URL: https://github.com/apache/hudi/pull/10956#issuecomment-2052428308

   
   ## CI report:
   
   * 37bc97b3e080cb3664405a446c0174655720d41c Azure: 
[SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=23221)
 
   * a73f9559fc8626342b767085cf7a56f743a425fc UNKNOWN
   
   
   Bot commands
 @hudi-bot supports the following commands:
   
- `@hudi-bot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org