[ https://issues.apache.org/jira/browse/SAMZA-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14357122#comment-14357122 ]
Chris Riccomini commented on SAMZA-458: --------------------------------------- bq. Which "sourceBuffers" do you mean here? the Yea, sorry for the confusion. This issue was opened before [~navina] upgraded Samza to the new producer in SAMZA-227. In the current code, there is no source buffer, but the bug still isn't fixed. I think what we need to do is iterate over latestFuture.keySet, in the stop() method, and foreach source (key) call flush. bq. how should the flush perform if the send always fails? If we take the approach that I described above, I think that a failed flush will cause the container to fail (line 143 in KafkaSystemProducer). This is the correct behavior, since going beyond a failed flush would cause the container to checkpoint its offsets, and we could then lose data (drop messages, then checkpoint offsets). > Close in KafkaSystemProducer should flush all source buffers > ------------------------------------------------------------ > > Key: SAMZA-458 > URL: https://issues.apache.org/jira/browse/SAMZA-458 > Project: Samza > Issue Type: Bug > Components: kafka > Reporter: Chris Riccomini > Assignee: Yan Fang > Labels: newbie > Fix For: 0.9.0 > > > I noticed that calling KafkaSystemProducer.stop does not flush any > outstanding messages in the sourceBuffers array. Calling close should flush > all buffers. Without this, shutting down can cause messages to be dropped. -- This message was sent by Atlassian JIRA (v6.3.4#6332)