[ 
https://issues.apache.org/jira/browse/SAMZA-458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14357122#comment-14357122
 ] 

Chris Riccomini commented on SAMZA-458:
---------------------------------------

bq. Which "sourceBuffers" do you mean here? the

Yea, sorry for the confusion. This issue was opened before [~navina] upgraded 
Samza to the new producer in SAMZA-227. In the current code, there is no source 
buffer, but the bug still isn't fixed.

I think what we need to do is iterate over latestFuture.keySet, in the stop() 
method, and foreach source (key) call flush.

bq. how should the flush perform if the send always fails?

If we take the approach that I described above, I think that a failed flush 
will cause the container to fail (line 143 in KafkaSystemProducer). This is the 
correct behavior, since going beyond a failed flush would cause the container 
to checkpoint its offsets, and we could then lose data (drop messages, then 
checkpoint offsets).

> Close in KafkaSystemProducer should flush all source buffers
> ------------------------------------------------------------
>
>                 Key: SAMZA-458
>                 URL: https://issues.apache.org/jira/browse/SAMZA-458
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>            Reporter: Chris Riccomini
>            Assignee: Yan Fang
>              Labels: newbie
>             Fix For: 0.9.0
>
>
> I noticed that calling KafkaSystemProducer.stop does not flush any 
> outstanding messages in the sourceBuffers array. Calling close should flush 
> all buffers. Without this, shutting down can cause messages to be dropped.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to