[
https://issues.apache.org/jira/browse/KAFKA-5144?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990619#comment-15990619
]
Matthias J. Sax edited comment on KAFKA-5144 at 5/1/17 7:29 AM:
----------------------------------------------------------------
It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp
for non-processed records in the buffer. The undocumented usage pattern is,
that `addElement()` and `removeElement()` are (must be) called in the same
order. Your tests don't follow this pattern. The logic is about tracking the
current partition time as minimum of whatever is in the buffer (also
considering out-of-order records) (cf. KAFKA-3514)
Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them
consecutively to the timestamp tracker. Thus, when we process those record one
by one, is the following steps:
- process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
- process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
- process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum
is {{12}}, not {{15}} !!)
- process {{12}}, queue {{20}}, partition time {{12}}
- process {{20}}, queue empty, partition time {{20}}
The tracker will have the following states when adding the records one by one
(this happens before processing begins):
- add {{5}}: {{5}}
- add {{10}}: {{5, 10}}
- add {{15}}: {{5, 10, 15}}
- add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
- add {{20}}: {{5, 10, 12, 20}}
During processing, we {{poll}} the head record from the queue, call
{{removeElement}} on the tracker afterwards. Thus, we get (not we start with
tracker state {{5, 10, 12, 20}}):
- poll from queue {{5}}, tracker after remove {{10, 12, 20}}
- poll from queue {{10}}, tracker after remove {{12, 20}}
- poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call
{{removeElement(15)}} we keep {{12}}, this allows us to use {{12}} for two
records)
- poll from queue {{12}}, tracker after remove {{20}}
- poll from queue {{20}}, tracker after remove empty
Note that the call to {{timeTracker.get()}} ({{RecordQueue}} L124) happens
after {{timeTracker.removeElement(elem)}} ({{RecordQueue}} L120). This is also
correct, as we advance "partition time" in this step, but only use it _after_
the current record got fully processed.
- process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12,
20}}, next partition time {{10}}
- process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}}, next
partition time {{12}}
- process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, next
partition time {{12}}
- process {{12}}, queue {{20}}, tracker after remove {{20}}, next partition
time {{20}}
- process {{20}}, queue empty, tracker after remove empty, next partition time
{{20}} (from {{lastKnownTime}})
was (Author: mjsax):
It is intended behavior. {{MinTimestampTracker}} tracks the minimum timestamp
for non-processed records in the buffer. The undocumented usage pattern is,
that `addElement()` and `removeElement()` are (must be) called in the same
order. Your tests don't follow this pattern. The logic is about tracking the
current partition time as minimum of whatever is in the buffer (also
considering out-of-order records) (cf. KAFKA-3514)
Example: We get a batch of records with ts {{5, 10, 15, 12, 20}} and add them
consecutively to the timestamp tracker. Thus, when we process those record one
by one, is the following steps (note, I am not sure if "partition time" in this
example is correct; cf. below):
- process {{5}}, queue {{10, 15, 12, 20}}, partition time {{5}}
- process {{10}}, queue {{15, 12, 20}}, partition time {{10}}
- process {{15}}, queue {{12, 20}}, partition time {{12}} (!! current minimum
is {{12}}, not {{15}} !!)
- process {{12}}, queue {{20}}, partition time {{12}}
- process {{20}}, queue empty, partition time {{20}}
The tracker will have the following states when adding the records one by one
(this happens before processing begins):
- add {{5}}: {{5}}
- add {{10}}: {{5, 10}}
- add {{15}}: {{5, 10, 15}}
- add {{12}}: {{5, 10, 12}} (!! this the the behavior that is not a bug !!)
- add {{20}}: {{5, 10, 12, 20}}
During processing, we {{poll}} the head record from the queue, call
{{removeElement}} on the tracker afterwards. Thus, we get (not we start with
tracker state {{5, 10, 12, 20}}):
- poll from queue {{5}}, tracker after remove {{10, 12, 20}}
- poll from queue {{10}}, tracker after remove {{12, 20}}
- poll from queue {{15}}, tracker after remove {{12, 20}} (!! as we call
{{removeElement(15)}} we keep {{12}}, this allows us to use {{12}} for two
records)
- poll from queue {{12}}, tracker after remove {{20}}
- poll from queue {{20}}, tracker after remove empty
However, I am not sure if we actually advance "partition time" as indented --
we might want to call {{timeTracker.get()}} ({{RecordQueue}} L124) before
{{timeTracker.removeElement(elem)}} ({{RecordQueue}} L120) -- it seem the
"partition time" in my example above is not what Streams computes atm (even if
the example seems as it would follow what is the intended "partition time"): if
I am not wrong, Streams would give:
- process {{5}}, queue {{10, 15, 12, 20}}, tracker after remove {{10, 12,
20}}, partition time {{10}}
- process {{10}}, queue {{15, 12, 20}}, tracker after remove {{12, 20}},
partition time {{12}}
- process {{15}}, queue {{12, 20}}, tracker after remove {{12, 20}}, partition
time {{12}}
- process {{12}}, queue {{20}}, tracker after remove {{20}}, partition time
{{20}}
- process {{20}}, queue empty, tracker after remove empty, partition time
{{20}} (from {{lastKnownTime}})
> MinTimestampTracker does not correctly add timestamps lower than the current
> max
> --------------------------------------------------------------------------------
>
> Key: KAFKA-5144
> URL: https://issues.apache.org/jira/browse/KAFKA-5144
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.1
> Reporter: Michal Borowiecki
> Assignee: Michal Borowiecki
>
> When adding elements MinTimestampTracker removes all existing elements
> greater than the added element.
> Perhaps I've missed something and this is intended behaviour but I can't find
> any evidence for that in comments or tests.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)