[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17092098#comment-17092098 ] Biao Liu edited comment on FLINK-16770 at 4/25/20, 7:12 AM: Technically speaking, the scenario we discussed here should not happen with the reverted commits. The finalization of checkpoint is reverted to be executed synchronously and wrapped in the coordinator-wide lock. There shouldn't be race condition at all. On the other hand, the earlier commits of the refactoring are merged over 3 months ago. So to answer the question of [~pnowojski], I think we have reverted enough commits. I have noticed that there are some logs: {quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... or kill -l [sigspec] Killed TM @ {quote} It seems that there is no TM process at some time. I guess it's not a normal scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before killing an old one in this case. What if there is no TM process at all? Exited or killed unexpectedly? I'm not sure. I think there will be no enough TM to finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if there are enough TMs, {quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS)) if [ ${MISSING_TMS} -eq 0 ]; then start a new TM only if we have exactly the expected number "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null fi {quote} I guess the failure cause is another one, maybe it's relevant to the "no TM process". But I can't tell what really happened in this case without any other logs. Is there any way we could find the JM logs? [~rmetzger] was (Author: sleepy): Technically speaking, the scenario we discussed here should not happen with the reverted codes. The finalization of checkpoint is reverted to be executed synchronously and wrapped in the coordinator-wide lock. There shouldn't be race condition at all. On the other hand, the earlier commits of the refactoring are merged over 3 months ago. So to answer the question of [~pnowojski], I think we have reverted enough commits. I have noticed that there are some logs: {quote}kill: usage: kill [-s sigspec | -n signum | -sigspec] pid | jobspec ... or kill -l [sigspec] Killed TM @ {quote} It seems that there is no TM process at some time. I guess it's not a normal scenario. The {{ha_tm_watchdog}} in common_ha.sh should start a new TM before killing an old one in this case. What if there is no TM process at all? Exited or killed unexpectedly? I'm not sure. I think there will be no enough TM to finish the testing case. Because the {{ha_tm_watchdog}} only starts a new TM if there are enough TMs, {quote}local MISSING_TMS=$((EXPECTED_TMS-RUNNING_TMS)) if [ ${MISSING_TMS} -eq 0 ]; then # start a new TM only if we have exactly the expected number "$FLINK_DIR"/bin/taskmanager.sh start > /dev/null fi{quote} I guess the failure cause is another one, maybe it's relevant to the "no TM process". But I can't tell what really happened in this case without any other logs. Is there any way we could find the JM logs? [~rmetzger] > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png > > Time Spent: 50m > Remaining Estimate: 0h > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > == > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT > 2020-03-25T06:50:
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091604#comment-17091604 ] Piotr Nowojski edited comment on FLINK-16770 at 4/24/20, 2:05 PM: -- Definitely your PR includes the reverted commits, so there is something to investigate here. Either there is another error with similar symptoms, or we haven't reverted enough commits. [~SleePy] [~yunta] what do you think has happened? For one thing I forgot to mention in this ticket that I've reverted the commits. We also need to clean up the tickets for this issue. I wanted to close this bug, but we were discussing solutions here, but I guess that was a mistake - after reverting the commits and re-opening the original issue, we should migrated discussions there. So let's keep this ticket open for the investigation of your most recent report [~rmetzger]. was (Author: pnowojski): Definitely your PR includes the reverted commits, so there is something to investigate here. Either there is another error with similar symptoms, or we haven't reverted enough commits. [~SleePy] [~yunta] what do you think has happened? For one thing I forgot to mention in this ticket that I've reverted the commits. We also need to clean up the tickets for this issue. I wanted to close this bug, but we were discussing solutions here, but I guess that was a mistake - after reverting the commits and re-opening the original issue, we should migrated discussions there. So lets keep this ticket open for the investigation of your most recent report [~rmetzger]. > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png > > Time Spent: 50m > Remaining Estimate: 0h > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > == > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT > 2020-03-25T06:50:31.6354639Z Starting cluster. > 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host > fv-az655. > 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655. > 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up. > 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with > ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks > STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true > SIMULATE_FAILURE=false ... > 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is > running. > 2020-03-25T06:50:46.1758132Z Waiting for job > (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints > ... > 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, > current progress: 173 records ... > 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0. > 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0. > 2020-03-25T06:50:50.5468230Z ls: cannot access > '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/ch
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17091543#comment-17091543 ] Piotr Nowojski edited comment on FLINK-16770 at 4/24/20, 1:00 PM: -- [~rmetzger] how could it happen? I've reverted the cause of this bug over a week ago: https://issues.apache.org/jira/browse/FLINK-14971 Or do we have to revert even more code [~SleePy]? [~rmetzger] what was the commit that failed there? Did it include the reverting commits: {code} ffa4475685 [8 days ago] (apache/master) Revert "[FLINK-14971][checkpointing] Handle ack/declined message of checkpoint" [ifndef-SleePy] 2813be7a3e [8 days ago] Revert "[FLINK-14971][checkpointing] Introduce main thread executor in CheckpointCoordinator to execute all non-IO operations instead of the timer thread" [ifndef-SleePy] e217a8c666 [8 days ago] Revert "[hotfix] Harden ResumeCheckpointManuallyITCase" [ifndef-SleePy] 6adbe94b0e [8 days ago] Revert "[FLINK-14971][checkpointing] Make CompletedCheckpointStore thread-safe to avoid synchronization outside" [ifndef-SleePy] 214896aeee [8 days ago] Revert "[FLINK-14971][checkpointing] Remove coordinator-wide lock of CheckpointCoordinator" [ifndef-SleePy] 5d5a29bbe1 [8 days ago] Revert "[FLINK-14971][checkpointing] Remove lock of PendingCheckpoint and introduce IO lock for PendingCheckpoint" [ifndef-SleePy] cfe5a27dd4 [8 days ago] Revert "[hotfix] Make the checkpoint resuming e2e case pass by increasing the retained checkpoints number" [ifndef-SleePy] d3fa2e5224 [8 days ago] Revert "[FLINK-16945][checkpointing] Execute CheckpointFailureManager.FailJobCallback directly in main thread executor" [ifndef-SleePy] {code} ? was (Author: pnowojski): [~rmetzger] how could it happen? I've reverted the cause of this bug over a week ago: https://issues.apache.org/jira/browse/FLINK-14971 Or do we have to revert even more code [~SleePy]? > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log, image-2020-04-16-11-24-54-549.png > > Time Spent: 50m > Remaining Estimate: 0h > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > == > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT > 2020-03-25T06:50:31.6354639Z Starting cluster. > 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host > fv-az655. > 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655. > 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up. > 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with > ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks > STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true > SIMULATE_FAILURE=false ... > 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is > running. > 2020-03-25T06:50:46.1758132Z Waiting for job > (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints > ... > 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, > current progress: 173 records ... > 2020-03-25T06:50:49.6332988Z Cancelling job b8cb
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901 ] Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 4:21 PM: Thank you all for the great discussion and analysis. I would like to add a few points and suggestions, from the way I understand the problem: h2. There are are two main issues: *(1) Missing ownership in the multi-threaded system. Meaning: Who owns the "Pending Checkpoint during Finalization"?* - It is owned by the CheckpointCoordinator (who aborts it when shutting down) - It is also owned by the I/O Thread or the Completed Checkpoint Store who writes it to ZooKeeper (or similar system). *(2) No Shared Ground Truth between the Checkpoint Coordinator and the JobMaster* - When a checkpoint is finalized, that decision is not consistently visible to the JM. - The JM only sees the result once it is in ZK, which is an asynchronous operation - That causes the final issue described here: possibility that the JM starts from an earlier checkpoint, if a restart happens while the async writing to ZK still happens. - NOTE: It is fine to ignore a checkpoint that was completed, if we did not send "notification complete" and we are sure it will always be ignored. That would be as if the checkpoint never completed. - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it will get committed later. That is the bug to prevent. h2. Two steps to a cleaner solution *(1) When the checkpoint is ready (all tasks acked, metadata written out), Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.* - That means the Checkpoint is removed from the "Pending Checkpoints" map and added to the CompletedCheckpointStore in one call in the main thread. If this is in one call, it is atomic against other modifications (cancellation, disposing checkpoints). Because the checkpoint is removed from the "Pending Checkpoints" map (not owned by the coordinator any more) it will not get cancelled during shutdown of the coordinator. ==> This is a very simple change *(2) The addition to the CompletedCheckpointStore must be constant time and executed in the main thread* - That means that the CompletedCheckpointStore would put the Completed Checkpoint into a local list and then kick off the asynchronous request to add it to ZK. - If the JM looks up the latest checkpoint, it refers to that local list. That way all local components refer to the same status and do not exchange status asynchronously via an external system (ZK). ==> The change is that the CompletedCheckpointStore would not always repopulate itself from ZK upon "restore checkpoint", but keep the local state and only repopulate itself when the master gains leader status (and clears itself when leader status is lost). ==> This is a slightly more complex change, but not too big. h2. Distributed Races and Corner Cases I think this is an existing corner case issue, that existed before, but is now more likely due to this bug. It exists, because JM failover can happen concurrently with ZK updates. * Once the call to add the checkpoint to ZK is sent off, the checkpoint might or might not get added to ZK (which is the distributed ground truth). * During that time, we cannot restore at all. ** If the JM already restored form the checkpoint, it sends "restore state" to the tasks, which is equivalent to "notify checkpoint complete" and materializes external side effects. If the addition to ZK then fails and the JM fails and another JM becomes leader, it will restore from an earlier checkpoint ** If the JM restores from an earlier checkpoint during that time, and then the ZK call completes, we have duplicate side effects. * In both cases we get fractured consistency or duplicate side effects I see three possible solutions, which are not easy or not great *(a) We cannot restore during the period where the checkpoint is in "uncertain if committed" state* * The CompletedCheckpointStore would need to keep the Checkpoint in a "uncertain" list initially, until the I/O executor call returns from adding the Checkpoint to ZK. * When asking the CompletedCheckpointStore for the latest checkpoint, it returns a CompletableFuture. * While the latest checkpoint is in "uncertain" state, the future cannot be completed. It completes after the ZK command completes (usually few 100ms). Restore operations would need to wait during that time. * There is a separate issue FLINK-16931 where "loading metadata" for the latest completed checkpoint can take long (seconds), because it is an I/O operations. This sounds like a similar issue, but I fear that the solution is more complex that anticipated in that issue. *(b) Operators never commit side-effects are during restore.* * That would be a change of the curren
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901 ] Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:50 PM: Thank you all for the great discussion and analysis. I would like to add a few points and suggestions, from the way I understand the problem: h2. There are are two main issues: *(1) Missing ownership in the multi-threaded system. Meaning: Who owns the "Pending Checkpoint during Finalization"?* - It is owned by the CheckpointCoordinator (who aborts it when shutting down) - It is also owned by the I/O Thread or the Completed Checkpoint Store who writes it to ZooKeeper (or similar system). *(2) No Shared Ground Truth between the Checkpoint Coordinator and the JobMaster* - When a checkpoint is finalized, that decision is not consistently visible to the JM. - The JM only sees the result once it is in ZK, which is an asynchronous operation - That causes the final issue described here: possibility that the JM starts from an earlier checkpoint, if a restart happens while the async writing to ZK still happens. - NOTE: It is fine to ignore a checkpoint that was completed, if we did not send "notification complete" and we are sure it will always be ignored. That would be as if the checkpoint never completed. - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it will get committed later. That is the bug to prevent. h2. Two steps to a cleaner solution *(1) When the checkpoint is ready (all tasks acked, metadata written out), Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.* - That means the Checkpoint is removed from the "Pending Checkpoints" map and added to the CompletedCheckpointStore in one call in the main thread. If this is in one call, it is atomic against other modifications (cancellation, disposing checkpoints). Because the checkpoint is removed from the "Pending Checkpoints" map (not owned by the coordinator any more) it will not get cancelled during shutdown of the coordinator. ==> This is a very simple change *(2) The addition to the CompletedCheckpointStore must be constant time and executed in the main thread* - That means that the CompletedCheckpointStore would put the Completed Checkpoint into a local list and then kick off the asynchronous request to add it to ZK. - If the JM looks up the latest checkpoint, it refers to that local list. That way all local components refer to the same status and do not exchange status asynchronously via an external system (ZK). ==> The change is that the CompletedCheckpointStore would not always repopulate itself from ZK upon "restore checkpoint", but keep the local state and only repopulate itself when the master gains leader status (and clears itself when leader status is lost). ==> This is a slightly more complex change, but not too big. h2. Distributed Races and Corner Cases I think this is an existing corner case issue, that existed before, but is now more likely due to this bug. It exists, because JM failover can happen concurrently with ZK updates. * Once the call to add the checkpoint to ZK is sent off, the checkpoint might or might not get added to ZK (which is the distributed ground truth). * During that time, we cannot restore at all. ** If the JM already restored form the checkpoint, it sends "restore state" to the tasks, which is equivalent to "notify checkpoint complete" and materializes external side effects. If the addition to ZK then fails and the JM fails and another JM becomes leader, it will restore from an earlier checkpoint ** If the JM restores from an earlier checkpoint during that time, and then the ZK call completes, we have duplicate side effects. * In both cases we get fractured consistency or duplicate side effects I see three possible solutions, which are not easy or not great *(a) We cannot restore during the period where the checkpoint is in "uncertain if committed" state* * The CompletedCheckpointStore would need to keep the Checkpoint in a "uncertain" list initially, until the I/O executor call returns from adding the Checkpoint to ZK. * When asking the CompletedCheckpointStore for the latest checkpoint, it returns a CompletableFuture. * While the latest checkpoint is in that list, the future cannot be completed. It completes when the ZK command completes (usually few 100ms). Restore operations would need to wait during that time. * There is a separate issue FLINK-16931 where "loading metadata" for the latest completed checkpoint can take long (seconds), because it is an I/O operations. This sounds like a similar issue, but I fear that the solution is more complex that anticipated in that issue. *(b) Operators never commit side-effects are during restore.* * That would be a change of the current contrac
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901 ] Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:37 PM: Thank you all for the great discussion and analysis. I would like to add a few points and suggestions, from the way I understand the problem: h2. There are are two main issues: *(1) Missing ownership in the multi-threaded system. Meaning: Who owns the "Pending Checkpoint during Finalization"?* - It is owned by the CheckpointCoordinator (who aborts it when shutting down) - It is also owned by the I/O Thread or the Completed Checkpoint Store who writes it to ZooKeeper (or similar system). *(2) No Shared Ground Truth between the Checkpoint Coordinator and the JobMaster* - When a checkpoint is finalized, that decision is not consistently visible to the JM. - The JM only sees the result once it is in ZK, which is an asynchronous operation - That causes the final issue described here: possibility that the JM starts from an earlier checkpoint, if a restart happens while the async writing to ZK still happens. - NOTE: It is fine to ignore a checkpoint that was completed, if we did not send "notification complete" and we are sure it will always be ignored. That would be as if the checkpoint never completed. - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it will get committed later. That is the bug to prevent. h2. Two steps to a cleaner solution *(1) When the checkpoint is ready (all tasks acked, metadata written out), Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.* - That means the Checkpoint is removed from the "Pending Checkpoints" map and added to the CompletedCheckpointStore in one call in the main thread. If this is in one call, it is atomic against other modifications (cancellation, disposing checkpoints). Because the checkpoint is removed from the "Pending Checkpoints" map (not owned by the coordinator any more) it will not get cancelled during shutdown of the coordinator. ==> This is a very simple change *(2) The addition to the CompletedCheckpointStore must be constant time and executed in the main thread* - That means that the CompletedCheckpointStore would put the Completed Checkpoint into a local list and then kick off the asynchronous request to add it to ZK. - If the JM looks up the latest checkpoint, it refers to that local list. That way all local components refer to the same status and do not exchage status asynchronously via an external system (ZK). ==> The change is that the CompletedCheckpointStore would not always repopulate itself from ZK upon "restore checkpoint", but keep the local state and only repopulate itself when the master gains leader status (and clears itself when leader status is lost). ==> This is a slightly more complex change, but not too big. h2. Distributed Races and Corner Cases I think this is an existing corner case issue, which is not caused by this bug, but related to this discussion. I list it here for completeness. It exists, because JM failover can happen concurrently with ZK updates. * Once the call to add the checkpoint to ZK is sent off, the checkpoint might or might not get added to ZK (which is the distributed ground truth). * During that time, we cannot restore at all. ** If the JM already restored form the checkpoint, it sends "restore state" to the tasks, which is equivalent to "notify checkpoint complete" and materializes external side effects. If the addition to ZK then fails and the JM fails and another JM becomes leader, it will restore from an earlier checkpoint ** If the JM restores from an earlier checkpoint during that time, and then the ZK call completes, we have duplicate side effects. * In both cases we get fractured consistency or duplicate side effects I see three possible solutions, which are not easy or not great *(a) We cannot restore during the period where the checkpoint is in "uncertain if committed" state* * The CompletedCheckpointStore would need to keep the Checkpoint in a "uncertain" list initially, until the I/O executor call returns from adding the Checkpoint to ZK. * When asking the CompletedCheckpointStore for the latest checkpoint, it returns a CompletableFuture. * While the latest checkpoint is in that list, the future cannot be completed. It completes when the ZK command completes (usually few 100ms). Restore operations would need to wait during that time. * There is a separate issue FLINK-16931 where "loading metadata" for the latest completed checkpoint can take long (seconds), because it is an I/O operations. This sounds like a similar issue, but I fear that the solution is more complex that anticipated in that issue. *(b) Operators never commit side-effects are during restore.* * That woul
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901 ] Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:35 PM: Thank you all for the great discussion and analysis. I would like to add a few points and suggestions, from the way I understand the problem: h2. There are are two main issues: *(1) Missing ownership in the multi-threaded system. Meaning: Who owns the "Pending Checkpoint during Finalization"?* - It is owned by the CheckpointCoordinator (who aborts it when shutting down) - It is also owned by the I/O Thread or the Completed Checkpoint Store who writes it to ZooKeeper (or similar system). *(2) No Shared Ground Truth between the Checkpoint Coordinator and the JobMaster* - When a checkpoint is finalized, that decision is not consistently visible to the JM. - The JM only sees the result once it is in ZK, which is an asynchronous operation - That causes the final issue described here: possibility that the JM starts from an earlier checkpoint, if a restart happens while the async writing to ZK still happens. - NOTE: It is fine to ignore a checkpoint that was completed, if we did not send "notification complete" and we are sure it will always be ignored. That would be as if the checkpoint never completed. - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it will get committed later. That is the bug to prevent. h2. Two steps to a cleaner solution *(1) When the checkpoint is ready (all tasks acked, metadata written out), Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.* - That means the Checkpoint is removed from the "Pending Checkpoints" map and added to the CompletedCheckpointStore in one call in the main thread. If this is in one call, it is atomic against other modifications (cancellation, disposing checkpoints). Because the checkpoint is removed from the "Pending Checkpoints" map (not owned by the coordinator any more) it will not get cancelled during shutdown of the coordinator. ==> This is a very simple change *(2) The addition to the CompletedCheckpointStore must be constant time and executed in the main thread* - That means that the CompletedCheckpointStore would put the Completed Checkpoint into a local list and then kick off the asynchronous request to add it to ZK. - If the JM looks up the latest checkpoint, it refers to that local list. That way all local components refer to the same status and do not exchage status asynchronously via an external system (ZK). ==> The change is that the CompletedCheckpointStore would not always repopulate itself from ZK upon "restore checkpoint", but keep the local state and only repopulate itself when the master gains leader status (and clears itself when leader status is lost). ==> This is a slightly more complex change, but not too big. h2. Distributed Races and Corner Cases I think this is an existing corner case issue, which is not caused by this bug, but related to this discussion. I list it here for completeness. It exists, because JM failover can happen concurrently with ZK updates. * Once the call to add the checkpoint to ZK is sent off, the checkpoint might or might not get added to ZK (which is the distributed ground truth). * During that time, we cannot restore at all. ** If the JM already restored form the checkpoint, it sends "restore state" to the tasks, which is equivalent to "notify checkpoint complete" and materializes external side effects. If the addition to ZK then fails and the JM fails and another JM becomes leader, it will restore from an earlier checkpoint ** If the JM restores from an earlier checkpoint during that time, and then the ZK call completes, we have duplicate side effects. * In both cases we get fractured consistency or duplicate side effects I see three possible solutions, which are not easy or not great *(a) We cannot restore during the period where the checkpoint is in "uncertain if committed" state* * The CompletedCheckpointStore would need to keep the Checkpoint in a "uncertain" list initially, until the I/O executor call returns from adding the Checkpoint to ZK. * When asking the CompletedCheckpointStore for the latest checkpoint, it returns a CompletableFuture. * While the latest checkpoint is in that list, the future cannot be completed. It completes when the ZK command completes (usually few 100ms). Restore operations would need to wait during that time. * There is a separate issue FLINK-16931 where "loading metadata" for the latest completed checkpoint can take long (seconds), because it is an I/O operations. This sounds like a similar issue, but I fear that the solution is more complex that anticipated in that issue. *(b) Operators never commit side-effects are during restore.* * That woul
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901 ] Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:31 PM: Thank you all for the great discussion and analysis. I would like to add a few points and suggestions, from the way I understand the problem: h2. There are are two main issues: *(1) Missing ownership in the multi-threaded system. Meaning: Who owns the "Pending Checkpoint during Finalization"?* - It is owned by the CheckpointCoordinator (who aborts it when shutting down) - It is also owned by the I/O Thread or the Completed Checkpoint Store who writes it to ZooKeeper (or similar system). *(2) No Shared Ground Truth between the Checkpoint Coordinator and the JobMaster* - When a checkpoint is finalized, that decision is not consistently visible to the JM. - The JM only sees the result once it is in ZK, which is an asynchronous operation - That causes the final issue described here: possibility that the JM starts from an earlier checkpoint, if a restart happens while the async writing to ZK still happens. - NOTE: It is fine to ignore a checkpoint that was completed, if we did not send "notification complete" and we are sure it will always be ignored. That would be as if the checkpoint never completed. - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it will get committed later. That is the bug to prevent. h2. Two steps to a cleaner solution *(1) When the checkpoint is ready (all tasks acked, metadata written out), Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.* - That means the Checkpoint is removed from the "Pending Checkpoints" map and added to the CompletedCheckpointStore in one call in the main thread. If this is in one call, it is atomic against other modifications (cancellation, disposing checkpoints). Because the checkpoint is removed from the "Pending Checkpoints" map (not owned by the coordinator any more) it will not get cancelled during shutdown of the coordinator. ==> This is a very simple change *(2) The addition to the CompletedCheckpointStore must be constant time and executed in the main thread* - That means that the CompletedCheckpointStore would put the Completed Checkpoint into a local list and then kick off the asynchronous request to add it to ZK. - If the JM looks up the latest checkpoint, it refers to that local list. That way all local components refer to the same status and do not exchage status asynchronously via an external system (ZK). ==> The change is that the CompletedCheckpointStore would not always repopulate itself from ZK upon "restore checkpoint", but keep the local state and only repopulate itself when the master gains leader status (and clears itself when leader status is lost). ==> This is a slightly more complex change, but not too big. h2. Distributed Races and Corner Cases I think this is an existing corner case issue, not related to this bug, but I list it here, for consistency. It exists, because JM failover can happen concurrently with ZK updates. * Once the call to add the checkpoint to ZK is sent off, the checkpoint might or might not get added to ZK (which is the distributed ground truth). * During that time, we cannot restore at all. ** If the JM already restored form the checkpoint, it sends "restore state" to the tasks, which is equivalent to "notify checkpoint complete" and materializes external side effects. If the addition to ZK then fails and the JM fails and another JM becomes leader, it will restore from an earlier checkpoint ** If the JM restores from an earlier checkpoint during that time, and then the ZK call completes, we have duplicate side effects. * In both cases we get fractured consistency or duplicate side effects I see three possible solutions, which are not easy or not great *(a) We cannot restore during the period where the checkpoint is in "uncertain if committed" state* * The CompletedCheckpointStore would need to keep the Checkpoint in a "uncertain" list initially, until the I/O executor call returns from adding the Checkpoint to ZK. * When asking the CompletedCheckpointStore for the latest checkpoint, it returns a CompletableFuture. * While the latest checkpoint is in that list, the future cannot be completed. It completes when the ZK command completes (usually few 100ms). Restore operations would need to wait during that time. * There is a separate issue FLINK-16931 where "loading metadata" for the latest completed checkpoint can take long (seconds), because it is an I/O operations. This sounds like a similar issue, but I fear that the solution is more complex that anticipated in that issue. *(b) Operators never commit side-effects are during restore.* * That would be a change of the current contrac
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17084901#comment-17084901 ] Stephan Ewen edited comment on FLINK-16770 at 4/16/20, 2:20 PM: Thank you all for the great discussion and analysis. I would like to add a few points and suggestions, from the way I understand the problem: h2. There are are two main issues: *(1) Missing ownership in the multi-threaded system. Meaning: Who owns the "Pending Checkpoint during Finalization"?* - It is owned by the CheckpointCoordinator (who aborts it when shutting down) - It is also owned by the I/O Thread or the Completed Checkpoint Store who writes it to ZooKeeper (or similar system). *(2) No Shared Ground Truth between the Checkpoint Coordinator and the JobMaster* - When a checkpoint is finalized, that decision is not consistently visible to the JM. - The JM only sees the result once it is in ZK, which is an asynchronous operation - That causes the final issue described here: possibility that the JM starts from an earlier checkpoint, if a restart happens while the async writing to ZK still happens. - NOTE: It is fine to ignore a checkpoint that was completed, if we did not send "notification complete" and we are sure it will always be ignored. That would be as if the checkpoint never completed. - NOTE: It is not fine to ignore it and start from an earlier checkpoint if it will get committed later. That is the bug to prevent. h2. Two steps to a cleaner solution *(1) When the checkpoint is ready (all tasks acked, metadata written out), Checkpoint Coordinator transfers ownership to the CompletedCheckpointStore.* - That means the Checkpoint is removed from the "Pending Checkpoints" map and added to the CompletedCheckpointStore in one call in the main thread. If this is in one call, it is atomic against other modifications (cancellation, disposing checkpoints). Because the checkpoint is removed from the "Pending Checkpoints" map (not owned by the coordinator any more) it will not get cancelled during shutdown of the coordinator. ==> This is a very simple change *(2) The addition to the CompletedCheckpointStore must be constant time and executed in the main thread* - That means that the CompletedCheckpointStore would put the Completed Checkpoint into a local list and then kick off the asynchronous request to add it to ZK. - If the JM looks up the latest checkpoint, it refers to that local list. That way all local components refer to the same status and do not exchage status asynchronously via an external system (ZK). ==> The change is that the CompletedCheckpointStore would not always repopulate itself from ZK upon "restore checkpoint", but keep the local state and only repopulate itself when the master gains leader status (and clears itself when leader status is lost). ==> This is a slightly more complex change, but not too big. h2. Distributed Races and Corner Cases I think this is an existing corner case issue, not related to this bug, but I list it here, for consistency. It exists, because JM failover can happen concurrently with ZK updates. * Once the call to add the checkpoint to ZK is sent off, the checkpoint might or might not get added to ZK (which is the distributed ground truth). * During that time, we cannot restore at all. ** If the JM already restored form the checkpoint, it sends "restore state" to the tasks, which is equivalent to "notify checkpoint complete" and materializes external side effects. If the addition to ZK then fails and the JM fails and another JM becomes leader, it will restore from an earlier checkpoint ** If the JM restores from an earlier checkpoint during that time, and then the ZK call completes, we have duplicate side effects. * In both cases we get fractured consistency or duplicate side effects I see three possible solutions, which are not easy or not great *(a) We cannot restore during the period where the checkpoint is in "uncertain if committed" state* * The CompletedCheckpointStore would need to keep the Checkpoint in a "uncertain" list initially, until the I/O executor call returns from adding the Checkpoint to ZK. * When asking the CompletedCheckpointStore for the latest checkpoint, it returns a CompletableFuture. * While the latest checkpoint is in that list, the future cannot be completed. It completes when the ZK command completes (usually few 100ms). Restore operations would need to wait during that time. * There is a separate issue FLINK-16931 where "loading metadata" for the latest completed checkpoint can take long (seconds), because it is an I/O operations. This sounds like a similar issue, but I fear that the solution is more complex that anticipated in that issue. *(b) Change the contracts with operators that side-effects are never committed during restore.* * Then it is
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17083018#comment-17083018 ] Biao Liu edited comment on FLINK-16770 at 4/14/20, 9:13 AM: [~aljoscha], the uploading to transfer.sh failed, I can't confirm the root cause. It might be the same reason. [~yunta], do you need some help? was (Author: sleepy): [~aljoscha], the uploading to transfer.sh failed, I can't confirm the root cause. It might be the same reason. > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: pull-request-available, test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log > > Time Spent: 20m > Remaining Estimate: 0h > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > == > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT > 2020-03-25T06:50:31.6354639Z Starting cluster. > 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host > fv-az655. > 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655. > 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:34.5498116Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:35.6031346Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:36.9848425Z Waiting for Dispatcher REST endpoint to come > up... > 2020-03-25T06:50:38.0283377Z Dispatcher REST endpoint is up. > 2020-03-25T06:50:38.0285490Z Running externalized checkpoints test, with > ORIGINAL_DOP=2 NEW_DOP=4 and STATE_BACKEND_TYPE=rocks > STATE_BACKEND_FILE_ASYNC=true STATE_BACKEND_ROCKSDB_INCREMENTAL=true > SIMULATE_FAILURE=false ... > 2020-03-25T06:50:46.1754645Z Job (b8cb04e4b1e730585bc616aa352866d0) is > running. > 2020-03-25T06:50:46.1758132Z Waiting for job > (b8cb04e4b1e730585bc616aa352866d0) to have at least 1 completed checkpoints > ... > 2020-03-25T06:50:46.3478276Z Waiting for job to process up to 200 records, > current progress: 173 records ... > 2020-03-25T06:50:49.6332988Z Cancelling job b8cb04e4b1e730585bc616aa352866d0. > 2020-03-25T06:50:50.4875673Z Cancelled job b8cb04e4b1e730585bc616aa352866d0. > 2020-03-25T06:50:50.5468230Z ls: cannot access > '/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304/externalized-chckpt-e2e-backend-dir/b8cb04e4b1e730585bc616aa352866d0/chk-[1-9]*/_metadata': > No such file or directory > 2020-03-25T06:50:50.5606260Z Restoring job with externalized checkpoint at . > ... > 2020-03-25T06:50:58.4728245Z > 2020-03-25T06:50:58.4732663Z > > 2020-03-25T06:50:58.4735785Z The program finished with the following > exception: > 2020-03-25T06:50:58.4737759Z > 2020-03-25T06:50:58.4742666Z > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit > JobGraph. > 2020-03-25T06:50:58.4746274Z at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > 2020-03-25T06:50:58.4749954Z at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > 2020-03-25T06:50:58.4752753Z at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:142) > 2020-03-25T06:50:58.4755400Z at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:659
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073388#comment-17073388 ] Biao Liu edited comment on FLINK-16770 at 4/2/20, 5:52 AM: --- Hi [~yunta], thanks for the response. If I understand correctly, there is an inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint which is doing asynchronous finalization. There are two strategies here, 1. The checkpoint which is doing finalization could be aborted when {{CheckpointCoordinator}} is being shut down or periodic scheduler is being stopped. This is the choice of current implementation. However we didn't handle the {{CompletedCheckpointStore}} well. For example it might be better that reverting the state of {{CompletedCheckpointStore}} when the {{PendingCheckpoint}} finds the discarding after asynchronous finalization. But I think it's not easy to do so. Because there might be a subsuming operation during {{CompletedCheckpointStore#addCheckpoint}}. 2. The checkpoint which is doing finalization could NOT be aborted when {{CheckpointCoordinator}} is being shut down or period scheduler is being stopped. I personally prefer this solution, because it could simply the concurrent conflict scenario and it's much easier to implement. I think introducing an atomic boolean might not be enough. It's better to rethink the relationship between {{PendingCheckpoint#abort}} and {{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of error handling of the finalization. BTW, [~yunta] could you share the unit test case which could reproduce the scenario locally? I want to verify my assumption and solution. The original e2e test case is not stable. was (Author: sleepy): Hi [~yunta], thanks for the response. If I understand correctly, there is an inconsistent state of {{CompletedCheckpointStore}} while stopping a checkpoint which is doing asynchronous finalization. There are two strategy here, 1. The checkpoint which is doing finalization could be aborted when {{CheckpointCoordinator}} is being shut down or periodic scheduler is being stopped. This is the choice of current implementation. However we didn't handle the {{CompletedCheckpointStore}} well. For example it might be better that reverting the state of {{CompletedCheckpointStore}} when the {{PendingCheckpoint}} finds the discarding after asynchronous finalization. But I think it's not easy to do so. Because there might be a subsuming operation during {{CompletedCheckpointStore#addCheckpoint}}. 2. The checkpoint which is doing finalization could NOT be aborted when {{CheckpointCoordinator}} is being shut down or period scheduler is being stopped. I personally prefer this solution, because it could simply the concurrent conflict scenario and it's much easier to implement. I think introducing an atomic boolean might not be enough. It's better to rethink the relationship between {{PendingCheckpoint#abort}} and {{PendingCheckpoint#finalizeCheckpoint}}. And we also need to rewrite a part of error handling of the finalization. BTW, [~yunta] could you share the unit test case which could reproduce the scenario locally? I want to verify my suggestion and solution. The original e2e test case is not stable. > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log > > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > == > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/fl
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17073041#comment-17073041 ] Yun Tang edited comment on FLINK-16770 at 4/1/20, 5:55 PM: --- [~SleePy] Yes, this bug is introduced from FLINK-14971 and I could reproduce it locally with unit test. There exists a logic competition between {{PendingCheckpoint#dispose}} and {{PendingCheckpoint#finalizeCheckpoint}}, current {{operationLock}} can only ensure the async phase to delete this pending checkpoint and adding completed checkpoint would not happen at the same time. However, this cannot ensure the pending checkpoint would not be firstly added to checkpoint store and then dropped. One quick fix would add atomic boolean to share among these two async operations, once this pending checkpoint is added to checkpoint store, it would not be dropped anymore asynchronously. However, this could lead something misleading: if this pending checkpoint is added to checkpoint store successfully asynchronously but tagged as disposed in the main thread. Although we could avoid to drop this in the async phase of {{PendingCheckpoint#dispose}}, checkpoint coordinator would not treat this pending checkpoint as successful and would not display in the checkpoint web UI. But luckily, we could ensure at least no data will be deleted by mistake, job could still failover by recovering from latest completed checkpoint. Another solution needs to compare and set some atomic variable in the main thread when {{PendingCheckpoint#dispose}} and share that when we try to add checkpoint store. If we firstly arrive to add checkpoint to store, we would not let main thread to tag that pending checkpoint as discarded. On the other hand, if we firstly arrive to tag this pending checkpoint would be discarded, we would not try to add to checkpoint store. I think this could be really light-weight and non-blocking, but it would introduce some extra CAS work in the main thread. What do you think of this ? [~pnowojski] was (Author: yunta): [~SleePy] Yes, this bug is introduced from FLINK-14971 and I could reproduce it locally with unit test. There exists a logic competition between {{PendingCheckpoint#dispose}} and {{PendingCheckpoint#finalizeCheckpoint}}, current {{operationLock}} can only ensure the async phase to delete this pending checkpoint and adding completed checkpoint would not happen at the same time. However, this cannot ensure the pending checkpoint would not be firstly added to checkpoint store and then dropped. One quick fix would add atomic boolean to share among these two async operations, once this pending checkpoint is added to checkpoint store, it would not be dropped anymore asynchronously. However, this could lead something misleading: if this pending checkpoint is added to checkpoint store successfully asynchronously but tagged as disposed in the main thread. Although we could avoid to drop this in the async phase of {{PendingCheckpoint#dispose}}, checkpoint coordinator would not treat this pending checkpoint as successful and would not display in the checkpoint web UI. But luckily, we could ensure at least no data will be deleted by mistake, job could still failover by recovering from latest completed checkpoint. Another solution needs to compare and set some atomic variable in the main thread when {{PendingCheckpoint#dispose}} and share that when we try to add checkpoint store. If we firstly arrive to add checkpoint to store, we would not let main thread to tag that pending checkpoint as discarded. On the other hand, if we firstly arrive to tag this pending checkpoint would be discarded, we would not try to add to checkpoint store. I think this could be really light-weight and non-blocking, but it would introduce some extra CAS work in the main thread. What do you think of this ? [~pnowojski] > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log > > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the cas
[jira] [Comment Edited] (FLINK-16770) Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end test fails with no such file
[ https://issues.apache.org/jira/browse/FLINK-16770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072458#comment-17072458 ] Yun Tang edited comment on FLINK-16770 at 4/1/20, 9:09 AM: --- [~rmetzger] I have reproduced this with additional logs in my [private branch |https://github.com/Myasuka/flink/tree/travis-fix-bug] and personal azure pipeline https://myasuka.visualstudio.com/flink/_build/results?buildId=10&view=logs&j=1f3ed471-1849-5d3c-a34c-19792af4ad16&t=2f5b54d0-1d28-5b01-d344-aa50ffe0cdf8 . >From the addtional logs, I have figured out why this could happen: {{CheckpointCoordinator}} drop the pending checkpoint-8 when cancelling the job in {{CheckpointCoordinator#stopCheckpointScheduler}}, however, chk-8 has just been asynchronously added to checkpoint store successfully during {{PendingCheckpoint#finalizeCheckpoint}}. On the other hand, once chk-8 is added to checkpoint store, chk-7 will be removed in checkpoint store. That's why we could see logs: {code:java} Checkpoint with ID 8 at 'file:/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-07881636808/externalized-chckpt-e2e-backend-dir/0329a0facde65e8a8432124ce5db8e3c/chk-8' not discarded. {code} In the end, chk-8 is deleted when we stop the scheduler and chk-7 is delete when chk-8 is successfully added to checkpoint store. I am not sure whether you have ever did some work to figure out the root cause, please assign this ticket to me if you don't mind and not doing so much work ever. was (Author: yunta): [~rmetzger] I have reproduced this with additional logs in my [private branch |https://github.com/Myasuka/flink/tree/travis-fix-bug] and personal azure pipeline https://myasuka.visualstudio.com/flink/_build/results?buildId=10&view=logs&j=1f3ed471-1849-5d3c-a34c-19792af4ad16&t=2f5b54d0-1d28-5b01-d344-aa50ffe0cdf8 . >From the addtional logs, I have figured out why this could happen: {{CheckpointCoordinator}} drop the pending checkpoint-8 when cancelling the job in {{CheckpointCoordinator#stopCheckpointScheduler}}, however, chk-8 has just been asynchronously added to checkpoint store successfully during {{PendingCheckpoint#finalizeCheckpoint}}. On the other hand, since the job is not shut down, the main thread executor will then subsume chk-7 with the successful result returned by {{PendingCheckpoint#finalizeCheckpoint}}. That's why we could see logs: {code:java} Checkpoint with ID 8 at 'file:/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-07881636808/externalized-chckpt-e2e-backend-dir/0329a0facde65e8a8432124ce5db8e3c/chk-8' not discarded. {code} In the end, chk-8 is deleted when we stop the scheduler and chk-7 is delete when we know chk-8 is successfully added to checkpoint store. I am not sure whether you have ever did some work to figure out the root cause, please assign this ticket to me if you don't mind and not doing so much work ever. > Resuming Externalized Checkpoint (rocks, incremental, scale up) end-to-end > test fails with no such file > --- > > Key: FLINK-16770 > URL: https://issues.apache.org/jira/browse/FLINK-16770 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Tests >Affects Versions: 1.11.0 >Reporter: Zhijiang >Assignee: Yun Tang >Priority: Blocker > Labels: test-stability > Fix For: 1.11.0 > > Attachments: e2e-output.log, > flink-vsts-standalonesession-0-fv-az53.log > > > The log : > [https://dev.azure.com/rmetzger/Flink/_build/results?buildId=6603&view=logs&j=c88eea3b-64a0-564d-0031-9fdcd7b8abee&t=1e2bbe5b-4657-50be-1f07-d84bfce5b1f5] > > There was also the similar problem in > https://issues.apache.org/jira/browse/FLINK-16561, but for the case of no > parallelism change. And this case is for scaling up. Not quite sure whether > the root cause is the same one. > {code:java} > 2020-03-25T06:50:31.3894841Z Running 'Resuming Externalized Checkpoint > (rocks, incremental, scale up) end-to-end test' > 2020-03-25T06:50:31.3895308Z > == > 2020-03-25T06:50:31.3907274Z TEST_DATA_DIR: > /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/temp-test-directory-31390197304 > 2020-03-25T06:50:31.5500274Z Flink dist directory: > /home/vsts/work/1/s/flink-dist/target/flink-1.11-SNAPSHOT-bin/flink-1.11-SNAPSHOT > 2020-03-25T06:50:31.6354639Z Starting cluster. > 2020-03-25T06:50:31.8871932Z Starting standalonesession daemon on host > fv-az655. > 2020-03-25T06:50:33.5021784Z Starting taskexecutor daemon on host fv-az655. > 2020-03-25T06:50:33.5152274Z Waiting for Dispatcher REST endpoint to come > up