nsivabalan commented on a change in pull request #2927:
URL: https://github.com/apache/hudi/pull/2927#discussion_r655052508



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
##########
@@ -89,22 +89,45 @@ object HoodieSparkUtils extends SparkAdapterSupport {
     new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, 
fileStatusCache)
   }
 
-  def createRdd(df: DataFrame, structName: String, recordNamespace: String): 
RDD[GenericRecord] = {
+  def createRdd(df: DataFrame, structName: String, recordNamespace: String, 
upgradeToLatestSchemaIfNeeded: Boolean): RDD[GenericRecord] = {
+    createRdd(df, null, structName, recordNamespace, 
upgradeToLatestSchemaIfNeeded)
+  }
+
+  def createRdd(df: DataFrame, latestSchema: Schema, structName: String, 
recordNamespace: String, upgradeToLatestSchemaIfNeeded: Boolean): 
RDD[GenericRecord] = {
     val avroSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
recordNamespace)
-    createRdd(df, avroSchema, structName, recordNamespace)
+    // if upgradeToLatestSchemaIfNeeded is set to true and latestSchema is not 
null, then try to leverage latestSchema
+    // this code path will handle situations where records are serialized in 
schema1, but callers wish to convert to
+    // Rdd[GenericRecord] using different schema(could be evolved schema or 
could be latest table schema)
+    if (upgradeToLatestSchemaIfNeeded && latestSchema != null) {

Review comment:
       There are quite a few callers which might hit this if condition. 
   Cases when latestSchema != null
   1. HoodieSparkSqlWriter regular operations (part of this patch) line 165. 
Could be incoming batch's writer schema or latest table schema. 
   2. DeltaSync.read
        a. non null transformer. SchemaProvider.getTargetSchema is passed in a 
latestSchema
        b. null transformer. Could be incoming batch's writer schema or latest 
table schema.  
   3. SourceFormatAdaptor.fetchNewDataInAvroFormat. RowSource. If 
FileBasedSchemaProvider, schemaProvider.getSourceSchema is passed in as 
latestSchema. But upgradeToLatestSchemaIfNeeded is set to false explicitly. 
   
   Note: cases 1 and 2 will pass the config value for 
upgradeToLatestSchemaIfNeeded as configured by the user. 
   
   So in these cases, depending on whether upgradeToLatestSchemaIfNeeded is set 
to true, we wanna go into line 102. But if set to false, but still if 
latestSchema is set, we might wanna use latestSchema in line 106. 
   If not for any of these scenarios, we wanna use df's schema(avroSchema) in 
line 106. (SparkBaseReader, SparkParquetBootstrapDataProvider, 
HoodieSparkSqlWriter delete operation, etc) 
   
   I am up for simplifying this code block by all means. Open to suggestions. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to