Github user ilooner commented on a diff in the pull request:
https://github.com/apache/incubator-apex-core/pull/168#discussion_r45010069
--- Diff: engine/src/main/java/com/datatorrent/stram/engine/Node.java ---
@@ -600,6 +605,19 @@ public void activate()
CHECKPOINT_WINDOW_COUNT = 1;
}
+ int dagChkptWndwCnt =
context.getValue(Context.DAGContext.CHECKPOINT_WINDOW_COUNT);
+ if (PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+ int chkOffset = dagChkptWndwCnt % CHECKPOINT_WINDOW_COUNT;
+ if (chkOffset != 0) {
+ EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt +
CHECKPOINT_WINDOW_COUNT - chkOffset;
+ } else {
+ EFFECTIVE_CHECKPOINT_WINDOW_COUNT = dagChkptWndwCnt;
+ }
+ } else {
+ EFFECTIVE_CHECKPOINT_WINDOW_COUNT = 1;
+ }
+ context.setWindowsFromCheckpoint(EFFECTIVE_CHECKPOINT_WINDOW_COUNT);
+
--- End diff --
I think this logic is incorrect. The EFFECTIVE_CHECKPOINT_WINDOW_COUNT is
not necessarily a constant.
Ex.
The Dag CHECKPOINT_WINDOW_COUNT is 5
The Operator CHECKPOINT_WINDOW_COUNT is 3
1 - No checkpoint
2 - No checkpoint
3 - Operator checkpoint window end is reached but no checkpoint is
performed because checkpoint tuple is not received yet
4- no checkpoint
5 - checkpoint tuple received
6** - end of the checkpoint window is reached so checkpoint window is
performed
7 - No checkpoint
8 - No checkpoint
9 - Operator checkpoint window end is reached but no checkpoint is
performed because checkpoint tuple is not received yet
10 - checkpoint tuple received
11 - No checkpoint
12** - end of the checkpoint window is reached so checkpoint window is
performed
13 - No checkpoint
14 - No checkpoint
15 - No checkpoint because end window is received first then checkpoint
tuple
16 - no checkpoint
17 - no checkpoint
18** - end of the checkpoint window is reached so checkpoint window is
performed
19 - No checkpoint
20 - checkpoint tuple is received
21** - end of the checkpoint window is reached so checkpoint window is
performed
So the effective checkpoint window counts in this example are 6, 6, 6, and 3
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---