boneanxs commented on code in PR #8076: URL: https://github.com/apache/hudi/pull/8076#discussion_r1177516056
########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -770,66 +770,70 @@ object HoodieSparkSqlWriter { } } - def bulkInsertAsRow(sqlContext: SQLContext, + def bulkInsertAsRow(writeClient: SparkRDDWriteClient[_], + parameters: Map[String, String], hoodieConfig: HoodieConfig, df: DataFrame, + mode: SaveMode, tblName: String, basePath: Path, - path: String, instantTime: String, writerSchema: Schema, - isTablePartitioned: Boolean): (Boolean, common.util.Option[String]) = { + tableConfig: HoodieTableConfig): + (Boolean, HOption[String], HOption[String], HOption[String], SparkRDDWriteClient[_], HoodieTableConfig) = { if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) { throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet") } + val sqlContext = writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getSqlContext + val jsc = writeClient.getEngineContext.asInstanceOf[HoodieSparkEngineContext].getJavaSparkContext val writerSchemaStr = writerSchema.toString - val opts = hoodieConfig.getProps.toMap ++ + // Make opts mutable since it could be modified by tryOverrideParquetWriteLegacyFormatProperty + val opts = mutable.Map() ++ hoodieConfig.getProps.toMap ++ Map(HoodieWriteConfig.AVRO_SCHEMA_STRING.key -> writerSchemaStr) - val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, path, tblName, mapAsJavaMap(opts)) - val populateMetaFields = hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS) - - val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) { - val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig) - if (userDefinedBulkInsertPartitionerOpt.isPresent) { - userDefinedBulkInsertPartitionerOpt.get - } else { - BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned) - } - } else { - // Sort modes are not yet supported when meta fields are disabled - new NonSortPartitionerWithRows() + // Auto set the value of "hoodie.parquet.writelegacyformat.enabled" + tryOverrideParquetWriteLegacyFormatProperty(opts, convertAvroSchemaToStructType(writerSchema)) + val writeConfig = DataSourceUtils.createHoodieConfig(writerSchemaStr, basePath.toString, tblName, opts) + val executor = mode match { + case SaveMode.Append => + new DatasetBulkInsertActionExecutor(writeConfig, writeClient, instantTime) Review Comment: writeClient is specifically for `RDD[HoodieRecord]`, since all `xxxActionExecutor` here are `Dataset[Row]` based, I didn't put these logic there before. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org