[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2022-01-05 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Story Points: 1

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: pull-request-available, sev:critical
> Fix For: 0.11.0, 0.10.1
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type 

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2022-01-05 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Status: In Progress  (was: Open)

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: pull-request-available, sev:critical
> Fix For: 0.11.0, 0.10.1
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2022-01-03 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Sprint: Hudi 0.10.1 -  2021/01/03

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: pull-request-available, sev:critical
> Fix For: 0.11.0, 0.10.1
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2022-01-03 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Fix Version/s: 0.10.1

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: pull-request-available, sev:critical
> Fix For: 0.11.0, 0.10.1
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type 

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2022-01-03 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Labels: pull-request-available sev:critical  (was: pull-request-available 
sev:high)

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: pull-request-available, sev:critical
> Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ 

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-31 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated HUDI-2947:
-
Labels: pull-request-available sev:high  (was: sev:high)

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: pull-request-available, sev:high
> Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-28 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Labels: sev:high  (was: )

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
>  Labels: sev:high
> Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type COPY_ON_WRITE \
>   --op BULK_INSERT 

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-28 Thread sivabalan narayanan (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sivabalan narayanan updated HUDI-2947:
--
Priority: Critical  (was: Blocker)

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Critical
> Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type COPY_ON_WRITE \
>   --op BULK_INSERT \
>   --checkpoint 0 \

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-06 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-2947:

Priority: Blocker  (was: Major)

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Assignee: sivabalan narayanan
>Priority: Blocker
> Fix For: 0.11.0
>
>
> *Problem:*
> When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 
> 0`, in the continuous mode, the deltastreamer job may pick up the wrong 
> checkpoint later on.  The wrong checkpoint (for 20211206203551080 commit) 
> happens after the replacecommit and clean, which is reset to "0", instead of 
> "5" after 20211206202728233.commit.  More details below.
>  
> The bug is due to the check here: 
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
> {code:java}
> if (cfg.checkpoint != null && 
> (StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))  
>   || 
> !cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
> resumeCheckpointStr = Option.of(cfg.checkpoint);
> } {code}
> In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
> and 
> "StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))" 
>  are both true as "--checkpoint 0" is configured and last commit is 
> replacecommit without checkpoint keys.  This leads to the resume checkpoint 
> string being reset to the configured checkpoint, skipping the timeline 
> walk-back logic below, which is wrong.  
>  
> Timeline:
>  
> {code:java}
>  189069 Dec  6 12:19 20211206201238649.commit
>       0 Dec  6 12:12 20211206201238649.commit.requested
>       0 Dec  6 12:12 20211206201238649.inflight
>  189069 Dec  6 12:27 20211206201959151.commit
>       0 Dec  6 12:20 20211206201959151.commit.requested
>       0 Dec  6 12:20 20211206201959151.inflight
>  189069 Dec  6 12:34 20211206202728233.commit
>       0 Dec  6 12:27 20211206202728233.commit.requested
>       0 Dec  6 12:27 20211206202728233.inflight
>   36662 Dec  6 12:35 20211206203449899.replacecommit
>       0 Dec  6 12:35 20211206203449899.replacecommit.inflight
>   34656 Dec  6 12:35 20211206203449899.replacecommit.requested
>   28013 Dec  6 12:35 20211206203503574.clean
>   19024 Dec  6 12:35 20211206203503574.clean.inflight
>   19024 Dec  6 12:35 20211206203503574.clean.requested
>  189069 Dec  6 12:43 20211206203551080.commit
>       0 Dec  6 12:35 20211206203551080.commit.requested
>       0 Dec  6 12:35 20211206203551080.inflight
>  189069 Dec  6 12:50 20211206204311612.commit
>       0 Dec  6 12:43 20211206204311612.commit.requested
>       0 Dec  6 12:43 20211206204311612.inflight
>       0 Dec  6 12:50 20211206205044595.commit.requested
>       0 Dec  6 12:50 20211206205044595.inflight
>     128 Dec  6 12:56 archived
>     483 Dec  6 11:52 hoodie.properties
>  {code}
>  
> Checkpoints in commits:
>  
> {code:java}
> grep "deltastreamer.checkpoint.key" *
> 20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
> 20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
> 20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
> 20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
> 20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
>  
> *Steps to reproduce:*
> Run HoodieDeltaStreamer in the continuous mode, by providing both 
> "--checkpoint 0" and "--continuous", with inline clustering and sync clean 
> enabled (some configs are masked).
>  
> {code:java}
> spark-submit \
>   --master yarn \
>   --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 
> 4 \
>   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>   --conf 
> spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
>  \
>   --conf spark.speculation=true \
>   --conf spark.speculation.multiplier=1.0 \
>   --conf spark.speculation.quantile=0.5 \
>   --packages org.apache.spark:spark-avro_2.12:3.2.0 \
>   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
>   file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
>   --props file:/home/hadoop/ethan/test.properties \
>   --source-class ... \
>   --source-ordering-field ts \
>   --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
>   --target-table test_table \
>   --table-type COPY_ON_WRITE \
>   --op BULK_INSERT \
>   --checkpoint 0 \
>   --continuous {code}

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-06 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-2947:

Description: 
*Problem:*

When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 0`, 
in the continuous mode, the deltastreamer job may pick up the wrong checkpoint 
later on.  The wrong checkpoint (for 20211206203551080 commit) happens after 
the replacecommit and clean, which is reset to "0", instead of "5" after 
20211206202728233.commit.  More details below.

 

The bug is due to the check here: 
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
{code:java}
if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} {code}
In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
and 
"StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"  
are both true as "--checkpoint 0" is configured and last commit is 
replacecommit without checkpoint keys.  This leads to the resume checkpoint 
string being reset to the configured checkpoint, skipping the timeline 
walk-back logic below, which is wrong.  

 

Timeline:

 
{code:java}
 189069 Dec  6 12:19 20211206201238649.commit
      0 Dec  6 12:12 20211206201238649.commit.requested
      0 Dec  6 12:12 20211206201238649.inflight
 189069 Dec  6 12:27 20211206201959151.commit
      0 Dec  6 12:20 20211206201959151.commit.requested
      0 Dec  6 12:20 20211206201959151.inflight
 189069 Dec  6 12:34 20211206202728233.commit
      0 Dec  6 12:27 20211206202728233.commit.requested
      0 Dec  6 12:27 20211206202728233.inflight
  36662 Dec  6 12:35 20211206203449899.replacecommit
      0 Dec  6 12:35 20211206203449899.replacecommit.inflight
  34656 Dec  6 12:35 20211206203449899.replacecommit.requested
  28013 Dec  6 12:35 20211206203503574.clean
  19024 Dec  6 12:35 20211206203503574.clean.inflight
  19024 Dec  6 12:35 20211206203503574.clean.requested
 189069 Dec  6 12:43 20211206203551080.commit
      0 Dec  6 12:35 20211206203551080.commit.requested
      0 Dec  6 12:35 20211206203551080.inflight
 189069 Dec  6 12:50 20211206204311612.commit
      0 Dec  6 12:43 20211206204311612.commit.requested
      0 Dec  6 12:43 20211206204311612.inflight
      0 Dec  6 12:50 20211206205044595.commit.requested
      0 Dec  6 12:50 20211206205044595.inflight
    128 Dec  6 12:56 archived
    483 Dec  6 11:52 hoodie.properties
 {code}
 

Checkpoints in commits:

 
{code:java}
grep "deltastreamer.checkpoint.key" *
20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
 

*Steps to reproduce:*

Run HoodieDeltaStreamer in the continuous mode, by providing both "--checkpoint 
0" and "--continuous", with inline clustering and sync clean enabled (some 
configs are masked).

 
{code:java}
spark-submit \
  --master yarn \
  --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 4 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
 \
  --conf spark.speculation=true \
  --conf spark.speculation.multiplier=1.0 \
  --conf spark.speculation.quantile=0.5 \
  --packages org.apache.spark:spark-avro_2.12:3.2.0 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
  file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
  --props file:/home/hadoop/ethan/test.properties \
  --source-class ... \
  --source-ordering-field ts \
  --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
  --target-table test_table \
  --table-type COPY_ON_WRITE \
  --op BULK_INSERT \
  --checkpoint 0 \
  --continuous {code}
test.properties:

 

 
{code:java}
hoodie.cleaner.commits.retained=4
hoodie.keep.min.commits=5
hoodie.keep.max.commits=7


hoodie.clean.async=true
hoodie.clustering.inline=true
hoodie.clustering.async.max.commits=3
hoodie.compact.inline.max.delta.commits=3


hoodie.insert.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulk_insert.shuffle.parallelism=10
hoodie.delete.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10


hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=partition


# turn off any small file handling, for ease of testing
hoodie.parquet.small.file.limit=1

benchmark.input.source.path=...{code}
 

 

  was:
*Problem:*

When 

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-06 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-2947:

Description: 
*Problem:*

When deltastreamer is started with a given checkpoint, e.g., `--checkpoint 0`, 
in the continuous mode, the deltastreamer job may pick up the wrong checkpoint 
later on.  The wrong checkpoint (for 20211206203551080 commit) happens after 
the replacecommit and clean, which is reset to "0", instead of "5" after 
20211206202728233.commit.  More details below.

 

The bug is due to the check here: 
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java#L335]
{code:java}
if (cfg.checkpoint != null && 
(StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))
|| 
!cfg.checkpoint.equals(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY {
resumeCheckpointStr = Option.of(cfg.checkpoint);
} {code}
In this case of resuming after a clustering commit, "cfg.checkpoint != null" 
and 
"StringUtils.isNullOrEmpty(commitMetadata.getMetadata(CHECKPOINT_RESET_KEY))"  
are both true as "--checkpoint 0" is configured and last commit is 
replacecommit without checkpoint keys.  This leads to the resume checkpoint 
string being reset to the configured checkpoint, skipping the timeline 
walk-back logic below, which is wrong.  

 

Timeline:

 
{code:java}
 189069 Dec  6 12:19 20211206201238649.commit
      0 Dec  6 12:12 20211206201238649.commit.requested
      0 Dec  6 12:12 20211206201238649.inflight
 189069 Dec  6 12:27 20211206201959151.commit
      0 Dec  6 12:20 20211206201959151.commit.requested
      0 Dec  6 12:20 20211206201959151.inflight
 189069 Dec  6 12:34 20211206202728233.commit
      0 Dec  6 12:27 20211206202728233.commit.requested
      0 Dec  6 12:27 20211206202728233.inflight
  36662 Dec  6 12:35 20211206203449899.replacecommit
      0 Dec  6 12:35 20211206203449899.replacecommit.inflight
  34656 Dec  6 12:35 20211206203449899.replacecommit.requested
  28013 Dec  6 12:35 20211206203503574.clean
  19024 Dec  6 12:35 20211206203503574.clean.inflight
  19024 Dec  6 12:35 20211206203503574.clean.requested
 189069 Dec  6 12:43 20211206203551080.commit
      0 Dec  6 12:35 20211206203551080.commit.requested
      0 Dec  6 12:35 20211206203551080.inflight
 189069 Dec  6 12:50 20211206204311612.commit
      0 Dec  6 12:43 20211206204311612.commit.requested
      0 Dec  6 12:43 20211206204311612.inflight
      0 Dec  6 12:50 20211206205044595.commit.requested
      0 Dec  6 12:50 20211206205044595.inflight
    128 Dec  6 12:56 archived
    483 Dec  6 11:52 hoodie.properties
 {code}
 

Checkpoints in commits:

 
{code:java}
grep "deltastreamer.checkpoint.key" *
20211206201238649.commit:    "deltastreamer.checkpoint.key" : "2"
20211206201959151.commit:    "deltastreamer.checkpoint.key" : "3"
20211206202728233.commit:    "deltastreamer.checkpoint.key" : "4"
20211206203551080.commit:    "deltastreamer.checkpoint.key" : "1"
20211206204311612.commit:    "deltastreamer.checkpoint.key" : "2" {code}
 

*Steps to reproduce:*

Run HoodieDeltaStreamer in the continuous mode, by providing both `--checkpoint 
0` and `--continuous`, with inline clustering and sync clean enabled (some 
configs are masked).

 
{code:java}
spark-submit \
  --master yarn \
  --driver-memory 8g --executor-memory 8g --num-executors 3 --executor-cores 4 \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf 
spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain
 \
  --conf spark.speculation=true \
  --conf spark.speculation.multiplier=1.0 \
  --conf spark.speculation.quantile=0.5 \
  --packages org.apache.spark:spark-avro_2.12:3.2.0 \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
  file:/home/hadoop/ethan/hudi-utilities-bundle_2.12-0.10.0-rc3.jar \
  --props file:/home/hadoop/ethan/test.properties \
  --source-class ... \
  --source-ordering-field ts \
  --target-base-path s3a://hudi-testing/test_hoodie_table_11/ \
  --target-table test_table \
  --table-type COPY_ON_WRITE \
  --op BULK_INSERT \
  --checkpoint 0 \
  --continuous {code}
test.properties:

 

 
{code:java}
hoodie.cleaner.commits.retained=4
hoodie.keep.min.commits=5
hoodie.keep.max.commits=7


hoodie.clean.async=true
hoodie.clustering.inline=true
hoodie.clustering.async.max.commits=3
hoodie.compact.inline.max.delta.commits=3


hoodie.insert.shuffle.parallelism=10
hoodie.upsert.shuffle.parallelism=10
hoodie.bulk_insert.shuffle.parallelism=10
hoodie.delete.shuffle.parallelism=10
hoodie.bulkinsert.shuffle.parallelism=10


hoodie.datasource.write.recordkey.field=key
hoodie.datasource.write.partitionpath.field=partition


# turn off any small file handling, for ease of testing
hoodie.parquet.small.file.limit=1

benchmark.input.source.path=...{code}
 

 

> 

[jira] [Updated] (HUDI-2947) HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config from CLI in continuous mode

2021-12-06 Thread Ethan Guo (Jira)


 [ 
https://issues.apache.org/jira/browse/HUDI-2947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ethan Guo updated HUDI-2947:

Fix Version/s: 0.11.0

> HoodieDeltaStreamer/DeltaSync can improperly pick up the checkpoint config 
> from CLI in continuous mode
> --
>
> Key: HUDI-2947
> URL: https://issues.apache.org/jira/browse/HUDI-2947
> Project: Apache Hudi
>  Issue Type: Bug
>Reporter: Ethan Guo
>Priority: Major
> Fix For: 0.11.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)