[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

2017-08-22 Thread tedyu
Github user tedyu commented on a diff in the pull request:

https://github.com/apache/flink/pull/4410#discussion_r134647310
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -1044,10 +1049,23 @@ public boolean restoreLatestCheckpointedState(
throw new 
IllegalStateException("CheckpointCoordinator is shut down");
}
 
-   // Recover the checkpoints
-   completedCheckpointStore.recover(sharedStateRegistry);
+   // We create a new shared state registry object, so 
that all pending async disposal requests from previous
+   // runs will go against the old object (were they can 
do no harm).
+   // This must happen under the checkpoint lock.
+   sharedStateRegistry.close();
+   sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+
+   // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
--- End diff --

If we use highAvailabilityServices.getJobManagerLeaderRetriever(), Job Id 
is required.
Can Job Id be obtained from JobVertexID ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

2017-08-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4410


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4410: [FLINK-7268] [checkpoints] Scope SharedStateRegist...

2017-07-27 Thread StefanRRichter
GitHub user StefanRRichter opened a pull request:

https://github.com/apache/flink/pull/4410

[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

## What is the purpose of the change

This PR fixes FLINK-7268. The problem was that 
`ZookeeperCompletedCheckpointStore` deletes checkpoints asynchronously. When 
this happens parallel to a restart, it could happen that the async delete 
performed shared state de-registration.

Before this PR, the old `SharedStateRegistry` was kept after restart and 
the counts where updated from the completed checkpoint store. In the described 
race, a checkpoint that has a pending delete will not contribute to the new 
count, but it can still decrement the count once the shared state is 
unregistered in the async deletion thread. This can accidentally drop counts 
below 1 and lead to data loss.

The core idea behind the PR is to scope the `SharedStateRegistry` per 
(re-)start, so that old pending deletes cannot influence the current count.

`SharedStateRegistry` is now created via a factory that is passed into the 
`CheckpointCoordinator` to simplify testing.

The PR also introduces additional tests and improves the debug/trace 
logging of incremental checkpointing.

## Verifying this change

This change added tests and can be verified as follows:

Run a job with keyed state, using incremental checkpoints and HA mode. Kill 
TMs to trigger recovery. After a couple of attemts, the problematic condition 
should be triggered, leading to an infinite recovery loop due to 
`FileNotFoundException`.

Additional tests:

`HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase`
`CheckpointCoordinatortest::testSharedStateRegistrationOnRestore``
`IncrementalKeyedStateHandleTest::testSharedStateReRegistration`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (YES)

## Documentation

  - Does this pull request introduce a new feature? (no)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StefanRRichter/flink 
ImprovedIncreemntalCPDebug

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4410.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4410


commit 2ed4f6b28c2fda674f1319f2a3678b2a231988ac
Author: Stefan Richter 
Date:   2017-06-26T16:07:59Z

[FLINK-7213] Introduce state management by OperatorID in TaskManager

commit a50eda8602d2034753b42413d23842a888e73611
Author: Stefan Richter 
Date:   2017-07-11T15:10:03Z

[FLINK-7213] Introduce TaskStateSnapshot to unify TaskStateHandles and 
SubtaskState

commit bce928fcfec73ff7584840ae7eb6b31fb727604f
Author: Stefan Richter 
Date:   2017-07-25T10:14:03Z

review comments zentol

commit 363f0ee18e06affe95c30095ed229ca8dfd47801
Author: Stefan Richter 
Date:   2017-07-26T11:31:30Z

review comments zentol part 2

commit 98e657ea02c17d972391dd2360c287a00f27231e
Author: Stefan Richter 
Date:   2017-07-26T11:31:47Z

review comments zentol part 2

commit 4460d3412f3ffc2e5496b65949751a31dae8a01a
Author: Stefan Richter 
Date:   2017-07-25T10:04:16Z

[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---