soumilshah1995 opened a new issue, #11436:
URL: https://github.com/apache/hudi/issues/11436

   Issue Description:
   I'm encountering an issue with Hudi configurations related to commit 
retention and cleaning. Despite explicitly setting 
hoodie.cleaner.commits.retained to 5, I'm receiving a warning suggesting it 
should be set to 20. It seems like the system is not acknowledging my provided 
value and is using some default instead.
   
   ```
   hudi_options = {
       "hoodie.keep.min.commits": "5",
       "hoodie.cleaner.commits.retained": "5",
       "hoodie.keep.max.commits": "6"
   }
   
   ```
   
   Warning Message:
   ```
   24/06/11 08:56:29 WARN HoodieWriteConfig: Increase hoodie.keep.min.commits=7 
to be greater than hoodie.cleaner.commits.retained=20 (there is risk of 
incremental pull missing data from few instants based on the current 
configuration). The Hudi archiver will automatically adjust the configuration 
regardless.
   
   ```
   Steps to Reproduce:
   Use the above Hudi configuration.
   Run the ingestion process using the provided code sample.
   Expected Behavior:
   The system should respect the explicitly set hoodie.cleaner.commits.retained 
value of 5 without suggesting an increase to 20.
   
   Actual Behavior:
   The system issues a warning to increase hoodie.keep.min.commits to be 
greater than hoodie.cleaner.commits.retained set to 20, despite 
hoodie.cleaner.commits.retained being explicitly set to 5.
   
   Environment:
   Hudi Version: 0.14.0
   Spark Version: 3.4
   OS: macOS
   Code Sample:
   ```
   
   import os
   import sys
   from pyspark.sql import SparkSession
   
   HUDI_VERSION = '0.14.0'
   SPARK_VERSION = '3.4'
   
   os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
   SUBMIT_ARGS = f"--packages 
org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} 
pyspark-shell"
   os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
   os.environ['PYSPARK_PYTHON'] = sys.executable
   
   # Spark session
   spark = SparkSession.builder \
       .config('spark.serializer', 
'org.apache.spark.serializer.KryoSerializer') \
       .config('spark.sql.extensions', 
'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
       .config('className', 'org.apache.hudi') \
       .config('spark.sql.hive.convertMetastoreParquet', 'false') \
       .getOrCreate()
   
   def write_to_hudi(spark_df,
                     table_name,
                     db_name,
                     method='upsert',
                     table_type='COPY_ON_WRITE',
                     recordkey='',
                     precombine='',
                     partition_fields='',
                     index_type='BLOOM',
                     curr_region='us-east-1'
                     ):
       path = f"file:///path/to/hudi/table/{db_name}/{table_name}"
   
       hudi_options = {
           'hoodie.table.name': table_name,
           'hoodie.datasource.write.table.type': table_type,
           'hoodie.datasource.write.table.name': table_name,
           'hoodie.datasource.write.operation': method,
           'hoodie.datasource.write.recordkey.field': recordkey,
           'hoodie.datasource.write.precombine.field': precombine,
           "hoodie.datasource.write.partitionpath.field": partition_fields,
           
           "hoodie.keep.min.commits": "5",
           "hoodie.cleaner.commits.retained": "5",
           "hoodie.keep.max.commits": "6",
   
           "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
           "hoodie.cleaner.policy.failed.writes": "LAZY",
           "hoodie.write.lock.provider": 
"org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider",
       }
       print(hudi_options)
       print(path)
   
       spark_df.write.format("hudi"). \
           options(**hudi_options). \
           mode("append"). \
           save(path)
   
   from pyspark.sql.types import StructType, StructField, StringType
   
   schema = StructType([
       StructField("id", StringType(), True),
       StructField("message", StringType(), True)
   ])
   
   # Loop to generate data and write to Hudi
   for i in range(1, 10):
       # Create the data
       updated_data = [(str(i), "Batch : {} ".format(i))]
   
       # Create the DataFrame with the new data
       df = spark.createDataFrame(updated_data, schema)
   
       # Show the DataFrame with the updated "message" column
       df.show()
   
       # Write to Hudi
       write_to_hudi(
           spark_df=df,
           method="upsert",
           db_name="default",
           table_name="messages",
           recordkey="id",
           precombine="message"
       )
   
       import time
       time.sleep(1)
   
   ```
   
   Additional Context:
   This behavior seems to indicate an internal adjustment or default setting 
that overrides the user-defined configuration, potentially causing confusion 
and misconfiguration. Any insights or fixes to ensure that the provided 
configurations are respected would be greatly appreciated.
   
   
   
![image](https://github.com/apache/hudi/assets/39345855/111b1694-7089-4cc7-bab6-2ff7a2fa0cfd)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to