wzx140 commented on code in PR #6745:
URL: https://github.com/apache/hudi/pull/6745#discussion_r981159766


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -867,18 +867,20 @@ object HoodieSparkSqlWriter {
           hoodieRecord
         }).toJavaRDD()
       case HoodieRecord.HoodieRecordType.SPARK =>
+        log.info("Use spark record")
         // ut will use AvroKeyGenerator, so we need to cast it in spark record
         val sparkKeyGenerator = 
keyGenerator.asInstanceOf[SparkKeyGeneratorInterface]
+        val schemaWithMetaField = HoodieAvroUtils.addMetadataFields(schema, 
config.allowOperationMetadataField)
         val structType = HoodieInternalRowUtils.getCachedSchema(schema)
-        val structTypeBC = SparkContext.getOrCreate().broadcast(structType)
-        HoodieInternalRowUtils.addCompressedSchema(structType)
+        val structTypeWithMetaField = 
HoodieInternalRowUtils.getCachedSchema(schemaWithMetaField)
+        val structTypeBC = sparkContext.broadcast(structType)
+        HoodieInternalRowUtils.broadcastCompressedSchema(List(structType, 
structTypeWithMetaField), sparkContext)

Review Comment:
   The broadcast ensures that every executor has the same fingerprint->schema 
cache. We can't make every executor initialize fingerprint->schema locally. 
   
   For example, if we initialize fingerprint->schema in sparksqlwriter df, it 
is possible that **the number of dataframe partitions is less than the number 
of executors**.
   
   ```scala
           HoodieInternalRowUtils.addCompressedSchema(structType)
           df.queryExecution.toRdd.map(row => {
             val internalRow = row.copy()
             val (processedRow, writeSchema) = 
getSparkProcessedRecord(partitionCols, internalRow, dropPartitionColumns, 
structTypeBC.value)
             val recordKey = sparkKeyGenerator.getRecordKey(internalRow, 
structTypeBC.value)
             val partitionPath = 
sparkKeyGenerator.getPartitionPath(internalRow, structTypeBC.value)
             val key = new HoodieKey(recordKey.toString, partitionPath.toString)
             // See this We can't guarantee that this is executed in all 
executors
             HoodieInternalRowUtils.addCompressedSchema(structType)
   
             new HoodieSparkRecord(key, processedRow, writeSchema)
           }).toJavaRDD().asInstanceOf[JavaRDD[HoodieRecord[_]]]
   ```
   As shown in the figure below, executor3 do not has the fingerPrint cache and 
can not deserialize the fingerPrint to schema.
   
![未命名文件](https://user-images.githubusercontent.com/25837418/192523203-d32dd632-f5b9-4b10-9f5f-ad733ce787d1.png)
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to