[ 
https://issues.apache.org/jira/browse/SAMZA-141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13896938#comment-13896938
 ] 

Chris Riccomini commented on SAMZA-141:
---------------------------------------

This is a pretty major issue. It totally breaks checkpointing, since half the 
time the KafkaCheckpointManager grabs the first offset in the map for each 
SystemStream, which may or may not be the SSP that's been updated.

> TaskInstance.offsets fails to update
> ------------------------------------
>
>                 Key: SAMZA-141
>                 URL: https://issues.apache.org/jira/browse/SAMZA-141
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>
> TaskInstance has a variable called offsets, which manages offsets for each 
> input stream. The variable is typed to SystemStream -> String. We use 
> SystemString when intracting with it in all places, EXCEPT in the 
> TaskInstance.process method, where we use a SystemStreamPartition as the key. 
> This doesn't work, since a SystemStreamPartition and SystemStream with the 
> same system and stream name are actually NOT equal, since the equality checks 
> do a class check, and the classes don't match.
> As a result, you end up with something like this:
> {code}
>     var offsets = Map[SystemStream, String]()
>     offsets += new SystemStream("foo", "bar") -> "123"
>     offsets += new SystemStreamPartition("foo", "bar", new Partition(0)) -> 
> "321"
>     System.err.println(offsets)
> {code}
> Which prints:
> {noformat}
> Map(SystemStream [system=foo, stream=bar] -> 123, SystemStreamPartition 
> [partition=Partition [partition=0], system=foo, stream=bar] -> 321)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)

Reply via email to