[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-25 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092098#comment-17092098
 ] 

Biao Liu edited comment on FLINK-16770 at 4/25/20, 7:12 AM:


Technically speaking, the scenario we discussed here should not happen with the 
reverted commits. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then

    start a new TM only if we have exactly the expected number
    "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi
{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]


was (Author: sleepy):
Technically speaking, the scenario we discussed here should not happen with the 
reverted codes. The finalization of checkpoint is reverted to be executed 
synchronously and wrapped in the coordinator-wide lock. There shouldn't be race 
condition at all. On the other hand, the earlier commits of the refactoring are 
merged over 3 months ago. So to answer the question of [~pnowojski], I think we 
have reverted enough commits.

I have noticed that there are some logs:
{quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... 
or kill -l [sigspec]
 Killed TM @
{quote}
It seems that there is no TM process at some time. I guess it's not a normal 
scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before 
killing an old one in this case. What if there is no TM process at all? Exited 
or killed unexpectedly? I'm not sure. I think there will be no enough TM to 
finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if 
there are enough TMs,
{quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS))
 if [ ${MISSING_TMS} -eq 0 ]; then
 # start a new TM only if we have exactly the expected number
 "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null
 fi{quote}
I guess the failure cause is another one, maybe it's relevant to the "no TM 
process". But I can't tell what really happened in this case without any other 
logs. Is there any way we could find the JM logs? [~rmetzger]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091604#comment-17091604
 ] 

Piotr Nowojski edited comment on FLINK-16770 at 4/24/20, 2:05 PM:
--

Definitely your PR includes the reverted commits, so there is something to 
investigate here. Either there is another error with similar symptoms, or we 
haven't reverted enough commits. [~SleePy] [~yunta] what do you think has 
happened?

For one thing I forgot to mention in this ticket that I've reverted the 
commits. We also need to clean up the tickets for this issue. I wanted to close 
this bug, but we were discussing solutions here, but I guess that was a mistake 
- after reverting the commits and re-opening the original issue, we should 
migrated discussions there. So let's keep this ticket open for the 
investigation of your most recent report [~rmetzger].


was (Author: pnowojski):
Definitely your PR includes the reverted commits, so there is something to 
investigate here. Either there is another error with similar symptoms, or we 
haven't reverted enough commits. [~SleePy] [~yunta] what do you think has 
happened?

For one thing I forgot to mention in this ticket that I've reverted the 
commits. We also need to clean up the tickets for this issue. I wanted to close 
this bug, but we were discussing solutions here, but I guess that was a mistake 
- after reverting the commits and re-opening the original issue, we should 
migrated discussions there. So lets keep this ticket open for the investigation 
of your most recent report [~rmetzger].

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/ch

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-24 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091543#comment-17091543
 ] 

Piotr Nowojski edited comment on FLINK-16770 at 4/24/20, 1:00 PM:
--

[~rmetzger] how could it happen? I've reverted the cause of this bug over a 
week ago:
https://issues.apache.org/jira/browse/FLINK-14971
Or do we have to revert even more code [~SleePy]? [~rmetzger] what was the 
commit that failed there? Did it include the reverting commits:
{code}
ffa4475685 [8 days ago] (apache/master) Revert "[FLINK-14971][checkpointing] 
Handle ack/declined message of checkpoint" [ifndef-SleePy]
2813be7a3e [8 days ago] Revert "[FLINK-14971][checkpointing] Introduce main 
thread executor in CheckpointCoordinator to execute all non-IO operations 
instead of the timer thread" [ifndef-SleePy]
e217a8c666 [8 days ago] Revert "[hotfix] Harden ResumeCheckpointManuallyITCase" 
[ifndef-SleePy]
6adbe94b0e [8 days ago] Revert "[FLINK-14971][checkpointing] Make 
CompletedCheckpointStore thread-safe to avoid synchronization outside" 
[ifndef-SleePy]
214896aeee [8 days ago] Revert "[FLINK-14971][checkpointing] Remove 
coordinator-wide lock of CheckpointCoordinator" [ifndef-SleePy]
5d5a29bbe1 [8 days ago] Revert "[FLINK-14971][checkpointing] Remove lock of 
PendingCheckpoint and introduce IO lock for PendingCheckpoint" [ifndef-SleePy]
cfe5a27dd4 [8 days ago] Revert "[hotfix] Make the checkpoint resuming e2e case 
pass by increasing the retained checkpoints number" [ifndef-SleePy]
d3fa2e5224 [8 days ago] Revert "[FLINK-16945][checkpointing] Execute 
CheckpointFailureManager.FailJobCallback directly in main thread executor" 
[ifndef-SleePy]
{code}
?


was (Author: pnowojski):
[~rmetzger] how could it happen? I've reverted the cause of this bug over a 
week ago:
https://issues.apache.org/jira/browse/FLINK-14971
Or do we have to revert even more code [~SleePy]? 

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png
>
>  Time Spent: 50m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901
 ] 

Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 4:21 PM:


Thank you all for the great discussion and analysis. I would like to add a few 
points and suggestions, from the way I understand the problem:
h2. There are are two main issues:

*(1) Missing ownership in the multi-threaded system. Meaning: Who owns the 
"Pending Checkpoint during Finalization"?*
 - It is owned by the CheckpointCoordinator (who aborts it when shutting down)
 - It is also owned by the I/O Thread or the Completed Checkpoint Store who 
writes it to ZooKeeper (or similar system).

*(2) No Shared Ground Truth between the Checkpoint Coordinator and the 
JobMaster*
 - When a checkpoint is finalized, that decision is not consistently visible to 
the JM.
 - The JM only sees the result once it is in ZK, which is an asynchronous 
operation
 - That causes the final issue described here: possibility that the JM starts 
from an earlier checkpoint, if a restart happens while the async writing to ZK 
still happens.
 - NOTE: It is fine to ignore a checkpoint that was completed, if we did not 
send "notification complete" and we are sure it will always be ignored. That 
would be as if the checkpoint never completed.
 - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it 
will get committed later. That is the bug to prevent.

h2. Two steps to a cleaner solution

*(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.*
 - That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.

    ==> This is a very simple change

 

*(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread*
 - That means that the CompletedCheckpointStore would put the Completed 
Checkpoint into a local list and then kick off the asynchronous request to add 
it to ZK.
 - If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchange status 
asynchronously via an external system (ZK).

==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
h2. Distributed Races and Corner Cases

I think this is an existing corner case issue, that existed before, but is now 
more likely due to this bug.

It exists, because JM failover can happen concurrently with ZK updates.
 * Once the call to add the checkpoint to ZK is sent off, the checkpoint might 
or might not get added to ZK (which is the distributed ground truth).
 * During that time, we cannot restore at all.
 ** If the JM already restored form the checkpoint, it sends "restore state" to 
the tasks, which is equivalent to "notify checkpoint complete" and materializes 
external side effects. If the addition to ZK then fails and the JM fails and 
another JM becomes leader, it will restore from an earlier checkpoint
 ** If the JM restores from an earlier checkpoint during that time, and then 
the ZK call completes, we have duplicate side effects.
 * In both cases we get fractured consistency or duplicate side effects

 

I see three possible solutions, which are not easy or not great

*(a) We cannot restore during the period where the checkpoint is in "uncertain 
if committed" state*
 * The CompletedCheckpointStore would need to keep the Checkpoint in a 
"uncertain" list initially, until the I/O executor call returns from adding the 
Checkpoint to ZK.
 * When asking the CompletedCheckpointStore for the latest checkpoint, it 
returns a CompletableFuture.
 * While the latest checkpoint is in "uncertain" state, the future cannot be 
completed. It completes after the ZK command completes (usually few 100ms). 
Restore operations would need to wait during that time.
 * There is a separate issue FLINK-16931 where "loading metadata" for the 
latest completed checkpoint can take long (seconds), because it is an I/O 
operations. This sounds like a similar issue, but I fear that the solution is 
more complex that anticipated in that issue.

*(b) Operators never commit side-effects are during restore.*
 * That would be a change of the curren

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901
 ] 

Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:50 PM:


Thank you all for the great discussion and analysis. I would like to add a few 
points and suggestions, from the way I understand the problem:
h2. There are are two main issues:

*(1) Missing ownership in the multi-threaded system. Meaning: Who owns the 
"Pending Checkpoint during Finalization"?*
 - It is owned by the CheckpointCoordinator (who aborts it when shutting down)
 - It is also owned by the I/O Thread or the Completed Checkpoint Store who 
writes it to ZooKeeper (or similar system).

*(2) No Shared Ground Truth between the Checkpoint Coordinator and the 
JobMaster*
 - When a checkpoint is finalized, that decision is not consistently visible to 
the JM.
 - The JM only sees the result once it is in ZK, which is an asynchronous 
operation
 - That causes the final issue described here: possibility that the JM starts 
from an earlier checkpoint, if a restart happens while the async writing to ZK 
still happens.
 - NOTE: It is fine to ignore a checkpoint that was completed, if we did not 
send "notification complete" and we are sure it will always be ignored. That 
would be as if the checkpoint never completed.
 - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it 
will get committed later. That is the bug to prevent.

h2. Two steps to a cleaner solution

*(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.*
 - That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.

    ==> This is a very simple change

 

*(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread*
 - That means that the CompletedCheckpointStore would put the Completed 
Checkpoint into a local list and then kick off the asynchronous request to add 
it to ZK.
 - If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchange status 
asynchronously via an external system (ZK).

==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
h2. Distributed Races and Corner Cases

I think this is an existing corner case issue, that existed before, but is now 
more likely due to this bug.

It exists, because JM failover can happen concurrently with ZK updates.
 * Once the call to add the checkpoint to ZK is sent off, the checkpoint might 
or might not get added to ZK (which is the distributed ground truth).
 * During that time, we cannot restore at all.
 ** If the JM already restored form the checkpoint, it sends "restore state" to 
the tasks, which is equivalent to "notify checkpoint complete" and materializes 
external side effects. If the addition to ZK then fails and the JM fails and 
another JM becomes leader, it will restore from an earlier checkpoint
 ** If the JM restores from an earlier checkpoint during that time, and then 
the ZK call completes, we have duplicate side effects.
 * In both cases we get fractured consistency or duplicate side effects

 

I see three possible solutions, which are not easy or not great

*(a) We cannot restore during the period where the checkpoint is in "uncertain 
if committed" state*
 * The CompletedCheckpointStore would need to keep the Checkpoint in a 
"uncertain" list initially, until the I/O executor call returns from adding the 
Checkpoint to ZK.
 * When asking the CompletedCheckpointStore for the latest checkpoint, it 
returns a CompletableFuture.
 * While the latest checkpoint is in that list, the future cannot be completed. 
It completes when the ZK command completes (usually few 100ms). Restore 
operations would need to wait during that time.
 * There is a separate issue FLINK-16931 where "loading metadata" for the 
latest completed checkpoint can take long (seconds), because it is an I/O 
operations. This sounds like a similar issue, but I fear that the solution is 
more complex that anticipated in that issue.

*(b) Operators never commit side-effects are during restore.*
 * That would be a change of the current contrac

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901
 ] 

Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:37 PM:


Thank you all for the great discussion and analysis. I would like to add a few 
points and suggestions, from the way I understand the problem:
h2. There are are two main issues:

*(1) Missing ownership in the multi-threaded system. Meaning: Who owns the 
"Pending Checkpoint during Finalization"?*
 - It is owned by the CheckpointCoordinator (who aborts it when shutting down)
 - It is also owned by the I/O Thread or the Completed Checkpoint Store who 
writes it to ZooKeeper (or similar system).

*(2) No Shared Ground Truth between the Checkpoint Coordinator and the 
JobMaster*
 - When a checkpoint is finalized, that decision is not consistently visible to 
the JM.
 - The JM only sees the result once it is in ZK, which is an asynchronous 
operation
 - That causes the final issue described here: possibility that the JM starts 
from an earlier checkpoint, if a restart happens while the async writing to ZK 
still happens.
 - NOTE: It is fine to ignore a checkpoint that was completed, if we did not 
send "notification complete" and we are sure it will always be ignored. That 
would be as if the checkpoint never completed.
 - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it 
will get committed later. That is the bug to prevent.

h2. Two steps to a cleaner solution

*(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.*
 - That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.

    ==> This is a very simple change

 

*(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread*
 - That means that the CompletedCheckpointStore would put the Completed 
Checkpoint into a local list and then kick off the asynchronous request to add 
it to ZK.
 - If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchage status 
asynchronously via an external system (ZK).

==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
h2. Distributed Races and Corner Cases

I think this is an existing corner case issue, which is not caused by this bug, 
but related to this discussion. I list it here for completeness.

It exists, because JM failover can happen concurrently with ZK updates.
 * Once the call to add the checkpoint to ZK is sent off, the checkpoint might 
or might not get added to ZK (which is the distributed ground truth).
 * During that time, we cannot restore at all.
 ** If the JM already restored form the checkpoint, it sends "restore state" to 
the tasks, which is equivalent to "notify checkpoint complete" and materializes 
external side effects. If the addition to ZK then fails and the JM fails and 
another JM becomes leader, it will restore from an earlier checkpoint
 ** If the JM restores from an earlier checkpoint during that time, and then 
the ZK call completes, we have duplicate side effects.
 * In both cases we get fractured consistency or duplicate side effects

 

I see three possible solutions, which are not easy or not great

*(a) We cannot restore during the period where the checkpoint is in "uncertain 
if committed" state*
 * The CompletedCheckpointStore would need to keep the Checkpoint in a 
"uncertain" list initially, until the I/O executor call returns from adding the 
Checkpoint to ZK.
 * When asking the CompletedCheckpointStore for the latest checkpoint, it 
returns a CompletableFuture.
 * While the latest checkpoint is in that list, the future cannot be completed. 
It completes when the ZK command completes (usually few 100ms). Restore 
operations would need to wait during that time.
 * There is a separate issue FLINK-16931 where "loading metadata" for the 
latest completed checkpoint can take long (seconds), because it is an I/O 
operations. This sounds like a similar issue, but I fear that the solution is 
more complex that anticipated in that issue.

*(b) Operators never commit side-effects are during restore.*
 * That woul

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901
 ] 

Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:35 PM:


Thank you all for the great discussion and analysis. I would like to add a few 
points and suggestions, from the way I understand the problem:
h2. There are are two main issues:

*(1) Missing ownership in the multi-threaded system. Meaning: Who owns the 
"Pending Checkpoint during Finalization"?*
 - It is owned by the CheckpointCoordinator (who aborts it when shutting down)
 - It is also owned by the I/O Thread or the Completed Checkpoint Store who 
writes it to ZooKeeper (or similar system).

*(2) No Shared Ground Truth between the Checkpoint Coordinator and the 
JobMaster*
 - When a checkpoint is finalized, that decision is not consistently visible to 
the JM.
 - The JM only sees the result once it is in ZK, which is an asynchronous 
operation
 - That causes the final issue described here: possibility that the JM starts 
from an earlier checkpoint, if a restart happens while the async writing to ZK 
still happens.
 - NOTE: It is fine to ignore a checkpoint that was completed, if we did not 
send "notification complete" and we are sure it will always be ignored. That 
would be as if the checkpoint never completed.
 - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it 
will get committed later. That is the bug to prevent.

h2. Two steps to a cleaner solution

*(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.*
 - That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.

    ==> This is a very simple change

 

*(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread*
 - That means that the CompletedCheckpointStore would put the Completed 
Checkpoint into a local list and then kick off the asynchronous request to add 
it to ZK.
 - If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchage status 
asynchronously via an external system (ZK).

==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
h2. Distributed Races and Corner Cases

I think this is an existing corner case issue, which is not caused by this bug, 
but related to this discussion. I list it here for completeness.

It exists, because JM failover can happen concurrently with ZK updates.
 * Once the call to add the checkpoint to ZK is sent off, the checkpoint might 
or might not get added to ZK (which is the distributed ground truth).
 * During that time, we cannot restore at all.
 ** If the JM already restored form the checkpoint, it sends "restore state" to 
the tasks, which is equivalent to "notify checkpoint complete" and materializes 
external side effects. If the addition to ZK then fails and the JM fails and 
another JM becomes leader, it will restore from an earlier checkpoint
 ** If the JM restores from an earlier checkpoint during that time, and then 
the ZK call completes, we have duplicate side effects.
 * In both cases we get fractured consistency or duplicate side effects

 

I see three possible solutions, which are not easy or not great

*(a) We cannot restore during the period where the checkpoint is in "uncertain 
if committed" state*
 * The CompletedCheckpointStore would need to keep the Checkpoint in a 
"uncertain" list initially, until the I/O executor call returns from adding the 
Checkpoint to ZK.
 * When asking the CompletedCheckpointStore for the latest checkpoint, it 
returns a CompletableFuture.
 * While the latest checkpoint is in that list, the future cannot be completed. 
It completes when the ZK command completes (usually few 100ms). Restore 
operations would need to wait during that time.
 * There is a separate issue FLINK-16931 where "loading metadata" for the 
latest completed checkpoint can take long (seconds), because it is an I/O 
operations. This sounds like a similar issue, but I fear that the solution is 
more complex that anticipated in that issue.

*(b) Operators never commit side-effects are during restore.*
 * That woul

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901
 ] 

Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:31 PM:


Thank you all for the great discussion and analysis. I would like to add a few 
points and suggestions, from the way I understand the problem:
h2. There are are two main issues:

*(1) Missing ownership in the multi-threaded system. Meaning: Who owns the 
"Pending Checkpoint during Finalization"?*
 - It is owned by the CheckpointCoordinator (who aborts it when shutting down)
 - It is also owned by the I/O Thread or the Completed Checkpoint Store who 
writes it to ZooKeeper (or similar system).

*(2) No Shared Ground Truth between the Checkpoint Coordinator and the 
JobMaster*
 - When a checkpoint is finalized, that decision is not consistently visible to 
the JM.
 - The JM only sees the result once it is in ZK, which is an asynchronous 
operation
 - That causes the final issue described here: possibility that the JM starts 
from an earlier checkpoint, if a restart happens while the async writing to ZK 
still happens.
 - NOTE: It is fine to ignore a checkpoint that was completed, if we did not 
send "notification complete" and we are sure it will always be ignored. That 
would be as if the checkpoint never completed.
 - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it 
will get committed later. That is the bug to prevent.

h2. Two steps to a cleaner solution

*(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.*
 - That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.

    ==> This is a very simple change

 

*(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread*
 - That means that the CompletedCheckpointStore would put the Completed 
Checkpoint into a local list and then kick off the asynchronous request to add 
it to ZK.
 - If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchage status 
asynchronously via an external system (ZK).

==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
h2. Distributed Races and Corner Cases

I think this is an existing corner case issue, not related to this bug, but I 
list it here, for consistency. It exists, because JM failover can happen 
concurrently with ZK updates.
 * Once the call to add the checkpoint to ZK is sent off, the checkpoint might 
or might not get added to ZK (which is the distributed ground truth).
 * During that time, we cannot restore at all.
 ** If the JM already restored form the checkpoint, it sends "restore state" to 
the tasks, which is equivalent to "notify checkpoint complete" and materializes 
external side effects. If the addition to ZK then fails and the JM fails and 
another JM becomes leader, it will restore from an earlier checkpoint
 ** If the JM restores from an earlier checkpoint during that time, and then 
the ZK call completes, we have duplicate side effects.
 * In both cases we get fractured consistency or duplicate side effects

 

I see three possible solutions, which are not easy or not great

*(a) We cannot restore during the period where the checkpoint is in "uncertain 
if committed" state*
 * The CompletedCheckpointStore would need to keep the Checkpoint in a 
"uncertain" list initially, until the I/O executor call returns from adding the 
Checkpoint to ZK.
 * When asking the CompletedCheckpointStore for the latest checkpoint, it 
returns a CompletableFuture.
 * While the latest checkpoint is in that list, the future cannot be completed. 
It completes when the ZK command completes (usually few 100ms). Restore 
operations would need to wait during that time.
 * There is a separate issue FLINK-16931 where "loading metadata" for the 
latest completed checkpoint can take long (seconds), because it is an I/O 
operations. This sounds like a similar issue, but I fear that the solution is 
more complex that anticipated in that issue.

*(b) Operators never commit side-effects are during restore.*
 * That would be a change of the current contrac

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-16 Thread Stephan Ewen (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901
 ] 

Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:20 PM:


Thank you all for the great discussion and analysis. I would like to add a few 
points and suggestions, from the way I understand the problem:
h2. There are are two main issues:

*(1) Missing ownership in the multi-threaded system. Meaning: Who owns the 
"Pending Checkpoint during Finalization"?*
 - It is owned by the CheckpointCoordinator (who aborts it when shutting down)
 - It is also owned by the I/O Thread or the Completed Checkpoint Store who 
writes it to ZooKeeper (or similar system).

*(2) No Shared Ground Truth between the Checkpoint Coordinator and the 
JobMaster*
 - When a checkpoint is finalized, that decision is not consistently visible to 
the JM.
 - The JM only sees the result once it is in ZK, which is an asynchronous 
operation
 - That causes the final issue described here: possibility that the JM starts 
from an earlier checkpoint, if a restart happens while the async writing to ZK 
still happens.
 - NOTE: It is fine to ignore a checkpoint that was completed, if we did not 
send "notification complete" and we are sure it will always be ignored. That 
would be as if the checkpoint never completed.
 - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it 
will get committed later. That is the bug to prevent.

h2. Two steps to a cleaner solution

*(1) When the checkpoint is ready (all tasks acked, metadata written out), 
Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.*
 - That means the Checkpoint is removed from the "Pending Checkpoints" map and 
added to the CompletedCheckpointStore in one call in the main thread. If this 
is in one call, it is atomic against other modifications (cancellation, 
disposing checkpoints). Because the checkpoint is removed from the "Pending 
Checkpoints" map (not owned by the coordinator any more) it will not get 
cancelled during shutdown of the coordinator.

    ==> This is a very simple change

 

*(2) The addition to the CompletedCheckpointStore must be constant time and 
executed in the main thread*
 - That means that the CompletedCheckpointStore would put the Completed 
Checkpoint into a local list and then kick off the asynchronous request to add 
it to ZK.
 - If the JM looks up the latest checkpoint, it refers to that local list. That 
way all local components refer to the same status and do not exchage status 
asynchronously via an external system (ZK).

==> The change is that the CompletedCheckpointStore would not always repopulate 
itself from ZK upon "restore checkpoint", but keep the local state and only 
repopulate itself when the master gains leader status (and clears itself when 
leader status is lost).

==> This is a slightly more complex change, but not too big.
h2. Distributed Races and Corner Cases

I think this is an existing corner case issue, not related to this bug, but I 
list it here, for consistency. It exists, because JM failover can happen 
concurrently with ZK updates.
 * Once the call to add the checkpoint to ZK is sent off, the checkpoint might 
or might not get added to ZK (which is the distributed ground truth).
 * During that time, we cannot restore at all.
 ** If the JM already restored form the checkpoint, it sends "restore state" to 
the tasks, which is equivalent to "notify checkpoint complete" and materializes 
external side effects. If the addition to ZK then fails and the JM fails and 
another JM becomes leader, it will restore from an earlier checkpoint
 ** If the JM restores from an earlier checkpoint during that time, and then 
the ZK call completes, we have duplicate side effects.
 * In both cases we get fractured consistency or duplicate side effects

 

I see three possible solutions, which are not easy or not great

*(a) We cannot restore during the period where the checkpoint is in "uncertain 
if committed" state*
 * The CompletedCheckpointStore would need to keep the Checkpoint in a 
"uncertain" list initially, until the I/O executor call returns from adding the 
Checkpoint to ZK.
 * When asking the CompletedCheckpointStore for the latest checkpoint, it 
returns a CompletableFuture.
 * While the latest checkpoint is in that list, the future cannot be completed. 
It completes when the ZK command completes (usually few 100ms). Restore 
operations would need to wait during that time.
 * There is a separate issue FLINK-16931 where "loading metadata" for the 
latest completed checkpoint can take long (seconds), because it is an I/O 
operations. This sounds like a similar issue, but I fear that the solution is 
more complex that anticipated in that issue.

*(b) Change the contracts with operators that side-effects are never committed 
during restore.*
 * Then it is

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-14 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083018#comment-17083018
 ] 

Biao Liu edited comment on FLINK-16770 at 4/14/20, 9:13 AM:


[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason. [~yunta], do you need some help?


was (Author: sleepy):
[~aljoscha], the uploading to transfer.sh failed, I can't confirm the root 
cause. It might be the same reason.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: pull-request-available, test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come 
> up...
> 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up.
> 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with 
> ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks 
> STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true 
> SIMULATE_FAILURE=false ...
> 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is 
> running.
> 2020-03-25T06:50:46.1758132Z Waiting for job 
> (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints 
> ...
> 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, 
> current progress: 173 records ...
> 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0.
> 2020-03-25T06:50:50.5468230Z ls: cannot access 
> '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata':
>  No such file or directory
> 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . 
> ...
> 2020-03-25T06:50:58.4728245Z 
> 2020-03-25T06:50:58.4732663Z 
> 
> 2020-03-25T06:50:58.4735785Z  The program finished with the following 
> exception:
> 2020-03-25T06:50:58.4737759Z 
> 2020-03-25T06:50:58.4742666Z 
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
> 2020-03-25T06:50:58.4746274Z  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> 2020-03-25T06:50:58.4749954Z  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> 2020-03-25T06:50:58.4752753Z  at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142)
> 2020-03-25T06:50:58.4755400Z  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Biao Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388
 ] 

Biao Liu edited comment on FLINK-16770 at 4/2/20, 5:52 AM:
---

Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategies here,
 1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
 2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization.

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my assumption and solution. The original e2e 
test case is not stable.


was (Author: sleepy):
Hi [~yunta], thanks for the response. If I understand correctly, there is an 
inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint 
which is doing asynchronous finalization.

There are two strategy here,
1. The checkpoint which is doing finalization could be aborted when 
{{CheckpointCoordinator}} is being shut down or periodic scheduler is being 
stopped. This is the choice of current implementation. However we didn't handle 
the {{CompletedCheckpointStore}} well. For example it might be better that 
reverting the state of {{CompletedCheckpointStore}} when the 
{{PendingCheckpoint}} finds the discarding after asynchronous finalization. But 
I think it's not easy to do so. Because there might be a subsuming operation 
during {{CompletedCheckpointStore#addCheckpoint}}.
2. The checkpoint which is doing finalization could NOT be aborted when 
{{CheckpointCoordinator}} is being shut down or period scheduler is being 
stopped. I personally prefer this solution, because it could simply the 
concurrent conflict scenario and it's much easier to implement. I think 
introducing an atomic boolean might not be enough. It's better to rethink the 
relationship between {{PendingCheckpoint#abort}} and 
{{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of 
error handling of the finalization. 

BTW, [~yunta] could you share the unit test case which could reproduce the 
scenario locally? I want to verify my suggestion and solution. The original e2e 
test case is not stable.

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/fl

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073041#comment-17073041
 ] 

Yun Tang edited comment on FLINK-16770 at 4/1/20, 5:55 PM:
---

[~SleePy] Yes, this bug is introduced from FLINK-14971 and I could reproduce it 
locally with unit test. There exists a logic competition between 
{{PendingCheckpoint#dispose}} and {{PendingCheckpoint#finalizeCheckpoint}}, 
current {{operationLock}} can only ensure the async phase to delete this 
pending checkpoint and adding completed checkpoint would not happen at the same 
time. However, this cannot ensure the pending checkpoint would not be firstly 
added to checkpoint store and then dropped. 

One quick fix would add atomic boolean to share among these two async 
operations, once this pending checkpoint is added to checkpoint store, it would 
not be dropped anymore asynchronously. However, this could lead something 
misleading: if this pending checkpoint is added to checkpoint store 
successfully asynchronously but tagged as disposed in the main thread. Although 
we could avoid to drop this in the async phase of  
{{PendingCheckpoint#dispose}}, checkpoint coordinator would not treat this 
pending checkpoint as successful and would not display in the checkpoint web 
UI. But luckily, we could ensure at least no data will be deleted by mistake, 
job could still failover by recovering from latest completed checkpoint.

Another solution needs to compare and set some atomic variable in the main 
thread when {{PendingCheckpoint#dispose}} and share that when we try to add 
checkpoint store. If we firstly arrive to add checkpoint to store, we would not 
let main thread to tag that pending checkpoint as discarded. On the other hand, 
if we firstly arrive to tag this pending checkpoint would be discarded, we 
would not try to add to checkpoint store. I think this could be really 
light-weight and non-blocking, but it would introduce some extra CAS work in 
the main thread. What do you think of this ? [~pnowojski]


was (Author: yunta):
[~SleePy] Yes, this bug is introduced from FLINK-14971 and I could reproduce it 
locally with unit test. There exists a logic competition between 
{{PendingCheckpoint#dispose}} and {{PendingCheckpoint#finalizeCheckpoint}}, 
current {{operationLock}} can only ensure the async phase to delete this 
pending checkpoint and adding completed checkpoint would not happen at the same 
time. However, this cannot ensure the pending checkpoint would not be firstly 
added to checkpoint store and then dropped. 
One quick fix would add atomic boolean to share among these two async 
operations, once this pending checkpoint is added to checkpoint store, it would 
not be dropped anymore asynchronously. However, this could lead something 
misleading: if this pending checkpoint is added to checkpoint store 
successfully asynchronously but tagged as disposed in the main thread. Although 
we could avoid to drop this in the async phase of  
{{PendingCheckpoint#dispose}}, checkpoint coordinator would not treat this 
pending checkpoint as successful and would not display in the checkpoint web 
UI. But luckily, we could ensure at least no data will be deleted by mistake, 
job could still failover by recovering from latest completed checkpoint.
Another solution needs to compare and set some atomic variable in the main 
thread when {{PendingCheckpoint#dispose}} and share that when we try to add 
checkpoint store. If we firstly arrive to add checkpoint to store, we would not 
let main thread to tag that pending checkpoint as discarded. On the other hand, 
if we firstly arrive to tag this pending checkpoint would be discarded, we 
would not try to add to checkpoint store. I think this could be really 
light-weight and non-blocking, but it would introduce some extra CAS work in 
the main thread. What do you think of this ? [~pnowojski]

> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the cas

[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file

2020-04-01 Thread Yun Tang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072458#comment-17072458
 ] 

Yun Tang edited comment on FLINK-16770 at 4/1/20, 9:09 AM:
---

[~rmetzger] I have reproduced this with additional logs in my [private branch 
|https://github.com/Myasuka/flink/tree/travis-fix-bug] and personal azure 
pipeline 
https://myasuka.visualstudio.com/flink/_build/results?buildId=10&view=logs&j=1f3ed471-1849-5d3c-a34c-19792af4ad16&t=2f5b54d0-1d28-5b01-d344-aa50ffe0cdf8
 . 
>From the addtional logs, I have figured out why this could happen:
{{CheckpointCoordinator}} drop the pending checkpoint-8 when cancelling the job 
in {{CheckpointCoordinator#stopCheckpointScheduler}}, however, chk-8 has just 
been asynchronously added to checkpoint store successfully during 
{{PendingCheckpoint#finalizeCheckpoint}}. On the other hand, once chk-8 is 
added to checkpoint store, chk-7 will be removed in checkpoint store. That's 
why we could see logs:

{code:java}
Checkpoint with ID 8 at 
'file:/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-07881636808/externalized-chckpt-e2e-backend-dir/0329a0facde65e8a8432124ce5db8e3c/chk-8'
 not discarded.
{code}

In the end, chk-8 is deleted when we stop the scheduler and chk-7 is delete 
when chk-8 is successfully added to checkpoint store.

I am not sure whether you have ever did some work to figure out the root cause, 
please assign this ticket to me if you don't mind and not doing so much work 
ever.



was (Author: yunta):
[~rmetzger] I have reproduced this with additional logs in my [private branch 
|https://github.com/Myasuka/flink/tree/travis-fix-bug] and personal azure 
pipeline 
https://myasuka.visualstudio.com/flink/_build/results?buildId=10&view=logs&j=1f3ed471-1849-5d3c-a34c-19792af4ad16&t=2f5b54d0-1d28-5b01-d344-aa50ffe0cdf8
 . 
>From the addtional logs, I have figured out why this could happen:
{{CheckpointCoordinator}} drop the pending checkpoint-8 when cancelling the job 
in {{CheckpointCoordinator#stopCheckpointScheduler}}, however, chk-8 has just 
been asynchronously added to checkpoint store successfully during 
{{PendingCheckpoint#finalizeCheckpoint}}. On the other hand, since the job is 
not shut down, the main thread executor will then subsume chk-7 with the 
successful result returned by  {{PendingCheckpoint#finalizeCheckpoint}}. That's 
why we could see logs:

{code:java}
Checkpoint with ID 8 at 
'file:/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-07881636808/externalized-chckpt-e2e-backend-dir/0329a0facde65e8a8432124ce5db8e3c/chk-8'
 not discarded.
{code}

In the end, chk-8 is deleted when we stop the scheduler and chk-7 is delete 
when we know chk-8 is successfully added to checkpoint store.

I am not sure whether you have ever did some work to figure out the root cause, 
please assign this ticket to me if you don't mind and not doing so much work 
ever.


> Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end 
> test fails with no such file
> ---
>
> Key: FLINK-16770
> URL: https://issues.apache.org/jira/browse/FLINK-16770
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing, Tests
>Affects Versions: 1.11.0
>Reporter: Zhijiang
>Assignee: Yun Tang
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.11.0
>
> Attachments: e2e-output.log, 
> flink-vsts-standalonesession-0-fv-az53.log
>
>
> The log : 
> [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5]
>  
> There was also the similar problem in 
> https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no 
> parallelism change. And this case is for scaling up. Not quite sure whether 
> the root cause is the same one.
> {code:java}
> 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint 
> (rocks, incremental, scale up) end-to-end test'
> 2020-03-25T06:50:31.3895308Z 
> ==
> 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: 
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304
> 2020-03-25T06:50:31.5500274Z Flink dist directory: 
> /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT
> 2020-03-25T06:50:31.6354639Z Starting cluster.
> 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host 
> fv-az655.
> 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655.
> 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come 
> up