[ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--------------------------------------
    Labels: sev:high  (was: )

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> ------------------------------------------------------------------------------------------------------
>
>                 Key: HUDI-2947
>                 URL: https://issues.apache.org/jira/browse/HUDI-2947
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Ethan Guo
>            Assignee: sivabalan narayanan
>            Priority: Critical
>              Labels: sev:high
>             Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>               || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY)))) {
>     resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type COPY_ON_WRITE \
>   --op BULK_INSERT \
>   --checkpoint 0 \
>   --continuous {code}
> test.properties:
>  
>  
> {code:java}
> hoodie.cleaner.commits.retained=4
> hoodie.keep.min.commits=5
> hoodie.keep.max.commits=7
> hoodie.clean.async=true
> hoodie.clustering.inline=true
> hoodie.clustering.async.max.commits=3
> hoodie.compact.inline.max.delta.commits=3
> hoodie.insert.shuffle.parallelism=10
> hoodie.upsert.shuffle.parallelism=10
> hoodie.bulk_insert.shuffle.parallelism=10
> hoodie.delete.shuffle.parallelism=10
> hoodie.bulkinsert.shuffle.parallelism=10
> hoodie.datasource.write.recordkey.field=key
> hoodie.datasource.write.partitionpath.field=partition
> # turn off any small file handling, for ease of testing
> hoodie.parquet.small.file.limit=1
> benchmark.input.source.path=...{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to