vinothchandar commented on a change in pull request #1834:
URL: https://github.com/apache/hudi/pull/1834#discussion_r468311412



##########
File path: hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
##########
@@ -108,262 +106,280 @@ private[hudi] object HoodieSparkSqlWriter {
           throw new HoodieException(s"hoodie table with name 
$existingTableName already exist at $basePath")
         }
       }
-      val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-          // register classes & schemas
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
-          val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
-          sparkContext.getConf.registerAvroSchemas(schema)
-          log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-          // Convert to RDD[HoodieRecord]
-          val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieAllIncomingRecords = genericRecords.map(gr => {
-            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, 
parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-              .asInstanceOf[Comparable[_]]
-            DataSourceUtils.createHoodieRecord(gr,
-              orderingVal, keyGenerator.getKey(gr),
-              parameters(PAYLOAD_CLASS_OPT_KEY))
-          }).toJavaRDD()
-
-          // Handle various save modes
-          if (mode == SaveMode.ErrorIfExists && exists) {
-            throw new HoodieException(s"hoodie table at $basePath already 
exists.")
-          }
 
-          if (mode == SaveMode.Overwrite && exists) {
-            log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
-            fs.delete(basePath, true)
-            exists = false
-          }
+      val (writeSuccessfulRetVal: Boolean, commitTimeRetVal: 
common.util.Option[String], compactionInstantRetVal: common.util.Option[String],
+      writeClientRetVal: HoodieWriteClient[HoodieRecordPayload[Nothing]], 
tableConfigRetVal: HoodieTableConfig) =
+         if 
(operation.equalsIgnoreCase(BULK_INSERT_DATASET_OPERATION_OPT_VAL)) {
+        // register classes & schemas
+        val structName = s"${tblName}_record"
+        val nameSpace = s"hoodie.${tblName}"
 
-          // Create the table if not present
-          if (!exists) {
-            //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
-            val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
-              path.get, HoodieTableType.valueOf(tableType),
-              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
-            tableConfig = tableMetaClient.getTableConfig
-          }
+        // Handle various save modes
+        if (mode == SaveMode.ErrorIfExists && exists) {
+          throw new HoodieException(s"hoodie table at $basePath already 
exists.")
+        }
 
-          // Create a HoodieWriteClient & issue the write.
-          val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, 
schema.toString, path.get,
-            tblName, mapAsJavaMap(parameters)
-          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+        val (success, commitTime: common.util.Option[String]) =
+          if (mode == SaveMode.Ignore && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Ignoring & 
not performing actual writes.")
+            (false, common.util.Option.ofNullable(instantTime))
+          } else {
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
+              val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val hoodieRecords =
-            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, 
mapAsJavaMap(parameters))
+            val writeConfig = DataSourceUtils.createHoodieConfig(null, 
path.get, tblName,
+              mapAsJavaMap(parameters))
+
+            val hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareHoodieDatasetForBulkInsert(sqlContext, 
writeConfig, df, structName, nameSpace)
+            
hoodieDF.write.format("org.apache.hudi.internal").option(INSTANT_TIME, 
instantTime)
+              .options(parameters).save()
+            val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+            val syncHiveSucess = if (hiveSyncEnabled) {
+              log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+              val fs = FSUtils.getFs(basePath.toString, 
jsc.hadoopConfiguration)
+              syncHive(basePath, fs, parameters)
             } else {
-              hoodieAllIncomingRecords
+              true
             }
-
-          if (hoodieRecords.isEmpty()) {
-            log.info("new batch has no new records, skipping...")
-            (true, common.util.Option.empty())
+            (syncHiveSucess, common.util.Option.ofNullable(instantTime))
           }
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doWriteOperation(client, 
hoodieRecords, instantTime, operation)
-          (writeStatuses, client)
-        } else {
+        (success, commitTime, common.util.Option.of(""), 
hoodieWriteClient.orNull, tableConfig)
+       } else {
+        val (writeStatuses, writeClient: 
HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
+          if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+            // register classes & schemas
+            val structName = s"${tblName}_record"
+            val nameSpace = s"hoodie.${tblName}"
+            sparkContext.getConf.registerKryoClasses(
+              Array(classOf[org.apache.avro.generic.GenericData],
+                classOf[org.apache.avro.Schema]))
+            val schema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+            sparkContext.getConf.registerAvroSchemas(schema)
+            log.info(s"Registered avro schema : ${schema.toString(true)}")
+
+            // Convert to RDD[HoodieRecord]
+            val keyGenerator = 
DataSourceUtils.createKeyGenerator(HoodieWriterUtils.toProperties(parameters))
+            val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
+            val hoodieAllIncomingRecords = genericRecords.map(gr => {
+              val orderingVal = DataSourceUtils.getNestedFieldVal(gr, 
parameters(PRECOMBINE_FIELD_OPT_KEY), false)
+                .asInstanceOf[Comparable[_]]
+              DataSourceUtils.createHoodieRecord(gr,
+                orderingVal, keyGenerator.getKey(gr), 
parameters(PAYLOAD_CLASS_OPT_KEY))
+            }).toJavaRDD()
+
+            // Handle various save modes
+            if (mode == SaveMode.ErrorIfExists && exists) {
+              throw new HoodieException(s"hoodie table at $basePath already 
exists.")
+            }
+            if (mode == SaveMode.Ignore && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Ignoring & 
not performing actual writes.")
+              (false, common.util.Option.empty())
+            }
+            if (mode == SaveMode.Overwrite && exists) {
+              log.warn(s"hoodie table at $basePath already exists. Deleting 
existing data & overwriting with new data.")
+              fs.delete(basePath, true)
+              exists = false
+            }
 
-          // Handle save modes
-          if (mode != SaveMode.Append) {
-            throw new HoodieException(s"Append is the only save mode 
applicable for $operation operation")
-          }
+            // Create the table if not present
+            if (!exists) {
+              //FIXME(bootstrap): bootstrapIndexClass needs to be set when 
bootstrap index class is integrated.
+              val tableMetaClient = 
HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+                path.get, HoodieTableType.valueOf(tableType),
+                tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, 
null, null)
+              tableConfig = tableMetaClient.getTableConfig
+            }
 
-          val structName = s"${tblName}_record"
-          val nameSpace = s"hoodie.${tblName}"
-          sparkContext.getConf.registerKryoClasses(
-            Array(classOf[org.apache.avro.generic.GenericData],
-              classOf[org.apache.avro.Schema]))
+            // Create a HoodieWriteClient & issue the write.
+            val client = DataSourceUtils.createHoodieClient(jsc, 
schema.toString, path.get, tblName,
+              mapAsJavaMap(parameters)
+            )
+
+            val hoodieRecords =
+              if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+                DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, 
mapAsJavaMap(parameters))
+              } else {
+                hoodieAllIncomingRecords
+              }
+
+            if (hoodieRecords.isEmpty()) {
+              log.info("new batch has no new records, skipping...")
+              (true, common.util.Option.empty())
+            }
+            client.startCommitWithTime(instantTime)
+            val writeStatuses = DataSourceUtils.doWriteOperation(client, 
hoodieRecords, instantTime, operation)
+            (writeStatuses, client)
+          } else {
+
+            // Handle save modes
+            if (mode != SaveMode.Append) {
+              throw new HoodieException(s"Append is the only save mode 
applicable for $operation operation")
+            }
 
-          // Convert to RDD[HoodieKey]
-          val keyGenerator = 
DataSourceUtils.createKeyGenerator(toProperties(parameters))
-          val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
-          val hoodieKeysToDelete = genericRecords.map(gr => 
keyGenerator.getKey(gr)).toJavaRDD()
+            val structName = s"${tblName}_record"
+            val nameSpace = s"hoodie.${tblName}"
+            sparkContext.getConf.registerKryoClasses(
+              Array(classOf[org.apache.avro.generic.GenericData],
+                classOf[org.apache.avro.Schema]))
 
-          if (!exists) {
-            throw new HoodieException(s"hoodie table at $basePath does not 
exist")
-          }
+            // Convert to RDD[HoodieKey]
+            val keyGenerator = 
DataSourceUtils.createKeyGenerator(HoodieWriterUtils.toProperties(parameters))
+            val genericRecords: RDD[GenericRecord] = 
AvroConversionUtils.createRdd(df, structName, nameSpace)
+            val hoodieKeysToDelete = genericRecords.map(gr => 
keyGenerator.getKey(gr)).toJavaRDD()
+
+            if (!exists) {
+              throw new HoodieException(s"hoodie table at $basePath does not 
exist")
+            }
 
-          // Create a HoodieWriteClient & issue the delete.
-          val client = 
hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
-            Schema.create(Schema.Type.NULL).toString, path.get, tblName,
-            
mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+            // Create a HoodieWriteClient & issue the delete.
+            val client = DataSourceUtils.createHoodieClient(jsc,
+              Schema.create(Schema.Type.NULL).toString, path.get, tblName,
+              mapAsJavaMap(parameters)
+            )
 
-          if (asyncCompactionTriggerFn.isDefined &&
-            isAsyncCompactionEnabled(client, tableConfig, parameters, 
jsc.hadoopConfiguration())) {
-            asyncCompactionTriggerFn.get.apply(client)
+            // Issue deletes
+            client.startCommitWithTime(instantTime)
+            val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysToDelete, instantTime)
+            (writeStatuses, client)
           }
 
-          // Issue deletes
-          client.startCommitWithTime(instantTime)
-          val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysToDelete, instantTime)
-          (writeStatuses, client)
+        // Check for errors and commit the write.
+        val (writeSuccessful, compactionInstant) =
+          commitAndPerformPostOperations(writeStatuses, parameters, 
writeClient, tableConfig, instantTime, basePath,
+            operation, jsc)
+        (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, writeClient, tableConfig)
         }
-
-      // Check for errors and commit the write.
-      val (writeSuccessful, compactionInstant) =
-        commitAndPerformPostOperations(writeStatuses, parameters, writeClient, 
tableConfig, instantTime, basePath,
-          operation, jsc)
-      (writeSuccessful, common.util.Option.ofNullable(instantTime), 
compactionInstant, writeClient, tableConfig)
+      (writeSuccessfulRetVal, commitTimeRetVal, compactionInstantRetVal, 
writeClientRetVal, tableConfigRetVal)
     }
   }
 
-  /**
-    * Add default options for unspecified write options keys.
-    *
-    * @param parameters
-    * @return
-    */
-  def parametersWithWriteDefaults(parameters: Map[String, String]): 
Map[String, String] = {
-    Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
-      TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
-      PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
-      PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
-      RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
-      PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
-      KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
-      COMMIT_METADATA_KEYPREFIX_OPT_KEY -> 
DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL,
-      INSERT_DROP_DUPS_OPT_KEY -> DEFAULT_INSERT_DROP_DUPS_OPT_VAL,
-      STREAMING_RETRY_CNT_OPT_KEY -> DEFAULT_STREAMING_RETRY_CNT_OPT_VAL,
-      STREAMING_RETRY_INTERVAL_MS_OPT_KEY -> 
DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL,
-      STREAMING_IGNORE_FAILED_BATCH_OPT_KEY -> 
DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL,
-      HIVE_SYNC_ENABLED_OPT_KEY -> DEFAULT_HIVE_SYNC_ENABLED_OPT_VAL,
-      HIVE_DATABASE_OPT_KEY -> DEFAULT_HIVE_DATABASE_OPT_VAL,
-      HIVE_TABLE_OPT_KEY -> DEFAULT_HIVE_TABLE_OPT_VAL,
-      HIVE_BASE_FILE_FORMAT_OPT_KEY -> DEFAULT_HIVE_BASE_FILE_FORMAT_OPT_VAL,
-      HIVE_USER_OPT_KEY -> DEFAULT_HIVE_USER_OPT_VAL,
-      HIVE_PASS_OPT_KEY -> DEFAULT_HIVE_PASS_OPT_VAL,
-      HIVE_URL_OPT_KEY -> DEFAULT_HIVE_URL_OPT_VAL,
-      HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
-      HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> 
DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
-      HIVE_STYLE_PARTITIONING_OPT_KEY -> 
DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
-      HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
-      ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL
-    ) ++ translateStorageTypeToTableType(parameters)
-  }
+    private def syncHive(basePath: Path, fs: FileSystem, parameters: 
Map[String, String]): Boolean
 
-  def toProperties(params: Map[String, String]): TypedProperties = {
-    val props = new TypedProperties()
-    params.foreach(kv => props.setProperty(kv._1, kv._2))
-    props
-  }
+    =
+    {
+      val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, 
parameters)
+      val hiveConf: HiveConf = new HiveConf()
+      hiveConf.addResource(fs.getConf)
+      new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
+      true
+    }
 
-  private def syncHive(basePath: Path, fs: FileSystem, parameters: Map[String, 
String]): Boolean = {
-    val hiveSyncConfig: HiveSyncConfig = buildSyncConfig(basePath, parameters)
-    val hiveConf: HiveConf = new HiveConf()
-    hiveConf.addResource(fs.getConf)
-    new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable()
-    true
-  }
+    private def buildSyncConfig(basePath: Path, parameters: Map[String, 
String]): HiveSyncConfig
+
+    =
+    {
+      val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
+      hiveSyncConfig.basePath = basePath.toString
+      hiveSyncConfig.baseFileFormat = 
parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
+      hiveSyncConfig.usePreApacheInputFormat =
+        parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => 
r.toBoolean)
+      hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
+      hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY)
+      hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
+      hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
+      hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
+      hiveSyncConfig.partitionFields =
+        
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList:
 _*)
+      hiveSyncConfig.partitionValueExtractorClass = 
parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
+      hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
+      hiveSyncConfig
+    }
 
-  private def buildSyncConfig(basePath: Path, parameters: Map[String, 
String]): HiveSyncConfig = {
-    val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig()
-    hiveSyncConfig.basePath = basePath.toString
-    hiveSyncConfig.baseFileFormat = parameters(HIVE_BASE_FILE_FORMAT_OPT_KEY);
-    hiveSyncConfig.usePreApacheInputFormat =
-      parameters.get(HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY).exists(r => 
r.toBoolean)
-    hiveSyncConfig.databaseName = parameters(HIVE_DATABASE_OPT_KEY)
-    hiveSyncConfig.tableName = parameters(HIVE_TABLE_OPT_KEY)
-    hiveSyncConfig.hiveUser = parameters(HIVE_USER_OPT_KEY)
-    hiveSyncConfig.hivePass = parameters(HIVE_PASS_OPT_KEY)
-    hiveSyncConfig.jdbcUrl = parameters(HIVE_URL_OPT_KEY)
-    hiveSyncConfig.partitionFields =
-      
ListBuffer(parameters(HIVE_PARTITION_FIELDS_OPT_KEY).split(",").map(_.trim).filter(!_.isEmpty).toList:
 _*)
-    hiveSyncConfig.partitionValueExtractorClass = 
parameters(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY)
-    hiveSyncConfig.useJdbc = parameters(HIVE_USE_JDBC_OPT_KEY).toBoolean
-    hiveSyncConfig
-  }
+    private def commitAndPerformPostOperations(writeStatuses: 
JavaRDD[WriteStatus],
+                                               parameters: Map[String, String],
+                                               client: 
HoodieWriteClient[HoodieRecordPayload[Nothing]],
+                                               tableConfig: HoodieTableConfig,
+                                               instantTime: String,
+                                               basePath: Path,
+                                               operation: String,
+                                               jsc: JavaSparkContext): 
(Boolean, common.util.Option[java.lang.String])
+
+    =
+    {
+      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
+      if (errorCount == 0) {
+        log.info("No errors. Proceeding to commit the write.")
+        val metaMap = parameters.filter(kv =>
+          kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+        val commitSuccess = if (metaMap.isEmpty) {
+          client.commit(instantTime, writeStatuses)
+        } else {
+          val extraMetadata: util.Map[String, String] = new 
util.HashMap[String, String](mapAsJavaMap(metaMap));
+          client.commit(instantTime, writeStatuses, 
common.util.Option.of(extraMetadata))
+        }
 
-  private def commitAndPerformPostOperations(writeStatuses: 
JavaRDD[WriteStatus],
-                                             parameters: Map[String, String],
-                                             client: 
HoodieWriteClient[HoodieRecordPayload[Nothing]],
-                                             tableConfig: HoodieTableConfig,
-                                             instantTime: String,
-                                             basePath: Path,
-                                             operation: String,
-                                             jsc: JavaSparkContext): (Boolean, 
common.util.Option[java.lang.String]) = {
-    val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
-    if (errorCount == 0) {
-      log.info("No errors. Proceeding to commit the write.")
-      val metaMap = parameters.filter(kv =>
-        kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
-      val commitSuccess = if (metaMap.isEmpty) {
-        client.commit(instantTime, writeStatuses)
-      } else {
-        client.commit(instantTime, writeStatuses,
-          common.util.Option.of(new util.HashMap[String, 
String](mapAsJavaMap(metaMap))))
-      }
+        if (commitSuccess) {
+          log.info("Commit " + instantTime + " successful!")
+        }
+        else {
+          log.info("Commit " + instantTime + " failed!")
+        }
 
-      if (commitSuccess) {
-        log.info("Commit " + instantTime + " successful!")
-      }
-      else {
-        log.info("Commit " + instantTime + " failed!")
-      }
+        val asyncCompactionEnabled = isAsyncCompactionEnabled(client, 
tableConfig, parameters, jsc.hadoopConfiguration())
+        val compactionInstant: common.util.Option[java.lang.String] =
+          if (asyncCompactionEnabled) {
+            client.scheduleCompaction(common.util.Option.of(new 
util.HashMap[String, String](mapAsJavaMap(metaMap))))
+          } else {
+            common.util.Option.empty()
+          }
 
-      val asyncCompactionEnabled = isAsyncCompactionEnabled(client, 
tableConfig, parameters, jsc.hadoopConfiguration())
-      val compactionInstant : common.util.Option[java.lang.String] =
-      if (asyncCompactionEnabled) {
-        client.scheduleCompaction(common.util.Option.of(new 
util.HashMap[String, String](mapAsJavaMap(metaMap))))
-      } else {
-        common.util.Option.empty()
-      }
+        log.info(s"Compaction Scheduled is $compactionInstant")
+        val hiveSyncEnabled = 
parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+        val syncHiveSucess = if (hiveSyncEnabled) {
+          log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
+          val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+          syncHive(basePath, fs, parameters)
+        } else {
+          true
+        }
 
-      log.info(s"Compaction Scheduled is $compactionInstant")
-      val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r 
=> r.toBoolean)
-      val syncHiveSucess = if (hiveSyncEnabled) {
-        log.info("Syncing to Hive Metastore (URL: " + 
parameters(HIVE_URL_OPT_KEY) + ")")
-        val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
-        syncHive(basePath, fs, parameters)
+        log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
+        if (!asyncCompactionEnabled) {
+          client.close()
+        }
+        (commitSuccess && syncHiveSucess, compactionInstant)
       } else {
-        true
-      }
-
-      log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
-      if (!asyncCompactionEnabled) {
-        client.close()
-      }
-      (commitSuccess && syncHiveSucess, compactionInstant)
-    } else {
-      log.error(s"$operation failed with $errorCount errors :")
-      if (log.isTraceEnabled) {
-        log.trace("Printing out the top 100 errors")
-        writeStatuses.rdd.filter(ws => ws.hasErrors)
-          .take(100)
-          .foreach(ws => {
-            log.trace("Global error :", ws.getGlobalError)
-            if (ws.getErrors.size() > 0) {
-              ws.getErrors.foreach(kt =>
-                log.trace(s"Error for key: ${kt._1}", kt._2))
-            }
-          })
+        log.error(s"$operation failed with $errorCount errors :")
+        if (log.isTraceEnabled) {
+          log.trace("Printing out the top 100 errors")
+          writeStatuses.rdd.filter(ws => ws.hasErrors)
+            .take(100)
+            .foreach(ws => {
+              log.trace("Global error :", ws.getGlobalError)
+              if (ws.getErrors.size() > 0) {
+                ws.getErrors.foreach(kt =>
+                  log.trace(s"Error for key: ${kt._1}", kt._2))
+              }
+            })
+        }
+        (false, common.util.Option.empty())
       }
-      (false, common.util.Option.empty())
     }
-  }
 
-  private def isAsyncCompactionEnabled(client: 
HoodieWriteClient[HoodieRecordPayload[Nothing]],
-                                       tableConfig: HoodieTableConfig,
-                                       parameters: Map[String, String], 
configuration: Configuration) : Boolean = {
-    log.info(s"Config.isInlineCompaction ? 
${client.getConfig.isInlineCompaction}")
-    if (!client.getConfig.isInlineCompaction
-      && parameters.get(ASYNC_COMPACT_ENABLE_KEY).exists(r => r.toBoolean)) {
-      tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
-    } else {
-      false
+    private def isAsyncCompactionEnabled(client: 
HoodieWriteClient[HoodieRecordPayload[Nothing]],

Review comment:
       these seem like formatting only change. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to