bithw1 edited a comment on issue #2276: URL: https://github.com/apache/hudi/issues/2276#issuecomment-733441100
The code that create/upsert the table is as follows, I have explicitly specified the following two lines to disable compaction. ``` .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false") .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false") ``` Not sure how I could be able to exercise the compaction feature with code..Could you please help? @bvaradar ,Thanks! ``` package org.example.hudi import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} import org.apache.hudi.index.HoodieIndex import org.apache.spark.sql.{SaveMode, SparkSession} case class MyOrder( name: String, price: String, creation_date: String, dt: String) object MORWorkTest { val overwrite1Data = Seq( MyOrder("A", "1", "2020-11-18 14:43:32", "2020-11-19"), MyOrder("B", "1", "2020-11-18 14:42:21", "2020-11-19"), MyOrder("C", "1", "2020-11-18 14:47:19", "2020-11-19"), MyOrder("D", "1", "2020-11-18 14:46:50", "2020-11-19") ) val insertUpdate1Data = Seq( MyOrder("A", "2", "2020-11-18 14:50:32", "2020-11-19"), MyOrder("B", "2", "2020-11-18 14:50:21", "2020-11-19"), MyOrder("C", "2", "2020-11-18 14:50:19", "2020-11-19"), MyOrder("D", "2", "2020-11-18 14:50:50", "2020-11-19") ) val insertUpdate2Data = Seq( MyOrder("A", "3", "2020-11-18 14:53:32", "2020-11-19"), MyOrder("B", "3", "2020-11-18 14:52:21", "2020-11-19"), MyOrder("C", "3", "2020-11-18 14:57:19", "2020-11-19"), MyOrder("D", "3", "2020-11-18 14:56:50", "2020-11-19") ) val spark = SparkSession.builder.appName("MORTest") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse") .enableHiveSupport().getOrCreate() val hudi_table = "hudi_hive_read_write_mor_5" val base_path = s"/data/hudi_demo/$hudi_table" def run(op: Int) = { val (data, saveMode) = op match { case 1 => (overwrite1Data, SaveMode.Overwrite) case 2 => (insertUpdate1Data, SaveMode.Append) case 3 => (insertUpdate2Data, SaveMode.Append) } import spark.implicits._ val insertData = spark.createDataset(data) insertData.write.format("hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "creation_date") .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz") .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table) .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt") //table type: MOR .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL) //disable async compact .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false") .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 100) //disable inline compact .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false") .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, "jdbc:hive2://10.41.90.208:10000") .option(HoodieWriteConfig.TABLE_NAME, hudi_table) .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt") .option("hoodie.insert.shuffle.parallelism", "2") .option("hoodie.upsert.shuffle.parallelism", "2") .mode(saveMode) .save(base_path); } def main(args: Array[String]): Unit = { //do overwrite run(1) //do upsert run(2) //do upsert run(3) println("===MOR is done=====") } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org