-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/29467/#review74843
-----------------------------------------------------------


Actually I spoke too fast... As the flush() has been checked in, we need to 
take care of the caller thread that are doing a flush when invoking close().
This is a little bit tricky. If we close the producer forcibily when caller 
thread were doing a flush, we have to notify the caller thread that the flush 
failed. The simplest way might be letting flush return a boolean value. So we 
do the following:
1. In RecordAccumulator add a new forceClose(), it sets an forceClosed flag 
first, then clear up the imcomplete batchset and wake up all the caller threads.
2. In RecordAccumulator.awaitFlushCompletion(), it checks the forceClosed flag 
to determine whether flush succeeded or not and return the result to 
KafkaProducer.flush().
3. KafkaProducer.flush() return this result to caller threads.


clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/29467/#comment121626>

    We probably need to release the caller threads that are waiting on flush() 
at this point.


- Jiangjie Qin


On March 2, 2015, 6:41 p.m., Parth Brahmbhatt wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/29467/
> -----------------------------------------------------------
> 
> (Updated March 2, 2015, 6:41 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Merge remote-tracking branch 'origin/trunk' into KAFKA-1660
> 
> Conflicts:
>       
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
> 
> Merge remote-tracking branch 'origin/trunk' into KAFKA-1660
> 
> 
> Changing log levels as suggested.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 7397e565fd865214529ffccadd4222d835ac8110 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ed9c63a6679e3aaf83d19fde19268553a4c107c2 
> 
> Diff: https://reviews.apache.org/r/29467/diff/
> 
> 
> Testing
> -------
> 
> existing unit tests passed.
> 
> 
> Thanks,
> 
> Parth Brahmbhatt
> 
>

Reply via email to