[ https://issues.apache.org/jira/browse/SAMZA-913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15212351#comment-15212351 ]
Yi Pan (Data Infrastructure) commented on SAMZA-913: ---------------------------------------------------- Merged and submitted. Thanks! > CoordinatorStreamSystemConsumer drops messages when they are considered > equivalent > ---------------------------------------------------------------------------------- > > Key: SAMZA-913 > URL: https://issues.apache.org/jira/browse/SAMZA-913 > Project: Samza > Issue Type: Bug > Reporter: Jake Maes > Assignee: Jake Maes > Fix For: 0.10.1 > > Attachments: SAMZA-913.patch > > > When CoordinatorStreamSystemConsumer bootstraps, it adds the messages to a > LinkedHashSet ("bootstrappedStreamSet"). The intent seems to be: > 1. Messages will be processed in the order they were consumed. > 2. Only the latest copy of a message will be stored. > That second assumption turns out to be false with the current implementation. > In Java, Set.add() only adds an element if it doesn't already exist in the > Set. Further, CoordinatorStreamMessage.equals() relies on the key set and > values, but not the message offset or timestamp, so the following set of > messages could occur: > key1 -> value1 // added to bootstrappedStreamSet > key1 -> value2 // added to bootstrappedStreamSet > key1 -> value1 // duplicate to first message, not added > Thus the final state will be (incorrectly): > key1 -> value2 -- This message was sent by Atlassian JIRA (v6.3.4#6332)