bithw1 edited a comment on issue #2276:
URL: https://github.com/apache/hudi/issues/2276#issuecomment-733441100


   The code that create/upsert the table is as follows, I have explicitly 
specified the following two lines to disable compaction.
   
   ```
         .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
   .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
   ```
   
   Not sure how I could be able to exercise the compaction feature with 
code..Could you please help? @bvaradar ,Thanks!
   
   
   
   
   
   
   ```
   package org.example.hudi
   
   import org.apache.hudi.DataSourceWriteOptions
   import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
   import org.apache.hudi.index.HoodieIndex
   import org.apache.spark.sql.{SaveMode, SparkSession}
   
   case class MyOrder(
                       name: String,
                       price: String,
                       creation_date: String,
                       dt: String)
   
   object MORWorkTest {
   
     val overwrite1Data = Seq(
       MyOrder("A", "1", "2020-11-18 14:43:32", "2020-11-19"),
       MyOrder("B", "1", "2020-11-18 14:42:21", "2020-11-19"),
       MyOrder("C", "1", "2020-11-18 14:47:19", "2020-11-19"),
       MyOrder("D", "1", "2020-11-18 14:46:50", "2020-11-19")
     )
   
     val insertUpdate1Data = Seq(
       MyOrder("A", "2", "2020-11-18 14:50:32", "2020-11-19"),
       MyOrder("B", "2", "2020-11-18 14:50:21", "2020-11-19"),
       MyOrder("C", "2", "2020-11-18 14:50:19", "2020-11-19"),
       MyOrder("D", "2", "2020-11-18 14:50:50", "2020-11-19")
     )
   
     val insertUpdate2Data = Seq(
       MyOrder("A", "3", "2020-11-18 14:53:32", "2020-11-19"),
       MyOrder("B", "3", "2020-11-18 14:52:21", "2020-11-19"),
       MyOrder("C", "3", "2020-11-18 14:57:19", "2020-11-19"),
       MyOrder("D", "3", "2020-11-18 14:56:50", "2020-11-19")
     )
   
     val spark = SparkSession.builder.appName("MORTest")
       .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
       .config("spark.sql.warehouse.dir", "hdfs:///user/hive/warehouse")
       .enableHiveSupport().getOrCreate()
   
     val hudi_table = "hudi_hive_read_write_mor_5"
   
     val base_path = s"/data/hudi_demo/$hudi_table"
   
     def run(op: Int) = {
       val (data, saveMode) = op match {
         case 1 => (overwrite1Data, SaveMode.Overwrite)
         case 2 => (insertUpdate1Data, SaveMode.Append)
         case 3 => (insertUpdate2Data, SaveMode.Append)
       }
       import spark.implicits._
       val insertData = spark.createDataset(data)
       insertData.write.format("hudi")
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, 
"creation_date")
         .option(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY, "xyz")
         .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, hudi_table)
         .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
         .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "dt")
   
         //table type: MOR
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
   
         //disable async compact
         .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_OPT_KEY, "false")
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, 
100)
         //disable inline compact
         .option(HoodieCompactionConfig.INLINE_COMPACT_PROP, "false")
   
   
         .option(DataSourceWriteOptions.HIVE_URL_OPT_KEY, 
"jdbc:hive2://10.41.90.208:10000")
         .option(HoodieWriteConfig.TABLE_NAME, hudi_table)
         .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, 
"org.apache.hudi.hive.MultiPartKeysValueExtractor")
         .option(HoodieIndexConfig.INDEX_TYPE_PROP, 
HoodieIndex.IndexType.GLOBAL_BLOOM.name())
         .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "dt")
         .option("hoodie.insert.shuffle.parallelism", "2")
         .option("hoodie.upsert.shuffle.parallelism", "2")
         .mode(saveMode)
         .save(base_path);
     }
   
   
     def main(args: Array[String]): Unit = {
       //do overwrite
       run(1)
   
       //do upsert
       run(2)
   
       //do upsert
       run(3)
   
       println("===MOR is done=====")
     }
   
   }
   ```
   
   
   
   
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to