vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468311076



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name 
$existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already 
exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: 
common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], 
tableConfigRetVal: HoodieTableConfig) =
+         if 
(operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
-            val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already 
exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & 
not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
+              val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, 
mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, 
path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, 
writeConfig, df, structName, nameSpace)
+            
hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, 
instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, 
jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, 
hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), 
hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =

Review comment:
       in general, we should do these rebase/prep PRs upfront so the reviews 
are easy. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to