[ 
https://issues.apache.org/jira/browse/SAMZA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212351#comment-15212351
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-913:
----------------------------------------------------

Merged and submitted. Thanks!

> CoordinatorStreamSystemConsumer drops messages when they are considered 
> equivalent
> ----------------------------------------------------------------------------------
>
>                 Key: SAMZA-913
>                 URL: https://issues.apache.org/jira/browse/SAMZA-913
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>             Fix For: 0.10.1
>
>         Attachments: SAMZA-913.patch
>
>
> When CoordinatorStreamSystemConsumer bootstraps, it adds the messages to a 
> LinkedHashSet ("bootstrappedStreamSet"). The intent seems to be:
> 1. Messages will be processed in the order they were consumed.
> 2. Only the latest copy of a message will be stored. 
> That second assumption turns out to be false with the current implementation. 
> In Java, Set.add() only adds an element if it doesn't already exist in the 
> Set. Further, CoordinatorStreamMessage.equals() relies on the key set  and 
> values, but not the message offset or timestamp, so the following set of 
> messages could occur:
> key1 -> value1  // added to bootstrappedStreamSet
> key1 -> value2  // added to bootstrappedStreamSet
> key1 -> value1  // duplicate to first message, not added
> Thus the final state will be (incorrectly):
> key1 -> value2



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to