boneanxs commented on code in PR #8076:
URL: https://github.com/apache/hudi/pull/8076#discussion_r1177516056


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -770,66 +770,70 @@ object HoodieSparkSqlWriter {
     }
   }
 
-  def bulkInsertAsRow(sqlContext: SQLContext,
+  def bulkInsertAsRow(writeClient: SparkRDDWriteClient[_],
+                      parameters: Map[String, String],
                       hoodieConfig: HoodieConfig,
                       df: DataFrame,
+                      mode: SaveMode,
                       tblName: String,
                       basePath: Path,
-                      path: String,
                       instantTime: String,
                       writerSchema: Schema,
-                      isTablePartitioned: Boolean): (Boolean, 
common.util.Option[String]) = {
+                      tableConfig: HoodieTableConfig):
+  (Boolean, HOption[String], HOption[String], HOption[String], 
SparkRDDWriteClient[_], HoodieTableConfig) = {
     if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
       throw new HoodieException("Dropping duplicates with bulk_insert in row 
writer path is not supported yet")
     }
+    val sqlContext = 
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext
+    val jsc = 
writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext
 
     val writerSchemaStr = writerSchema.toString
 
-    val opts = hoodieConfig.getProps.toMap ++
+    // Make opts mutable since it could be modified by 
tryOverrideParquetWriteLegacyFormatProperty
+    val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++
       Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr)
 
-    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, 
path, tblName, mapAsJavaMap(opts))
-    val populateMetaFields = 
hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS)
-
-    val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if 
(populateMetaFields) {
-      val userDefinedBulkInsertPartitionerOpt = 
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
-      if (userDefinedBulkInsertPartitionerOpt.isPresent) {
-        userDefinedBulkInsertPartitionerOpt.get
-      } else {
-        BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, 
isTablePartitioned)
-      }
-    } else {
-      // Sort modes are not yet supported when meta fields are disabled
-      new NonSortPartitionerWithRows()
+    // Auto set the value of "hoodie.parquet.writelegacyformat.enabled"
+    tryOverrideParquetWriteLegacyFormatProperty(opts, 
convertAvroSchemaToStructType(writerSchema))
+    val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, 
basePath.toString, tblName, opts)
+    val executor = mode match {
+      case SaveMode.Append =>
+        new DatasetBulkInsertActionExecutor(writeConfig, writeClient, 
instantTime)

Review Comment:
   writeClient is specifically for `RDD[HoodieRecord]`, since all 
`xxxActionExecutor` here are `Dataset[Row]` based, I didn't put these logic 
there before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to