-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31850/#review79063
-----------------------------------------------------------


Thanks for the patch.


clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128121>

    [NOTE] is not a standard javadoc highlight is it? (I don't know.) If not, 
can you just use the standard <strong> for emphasis?



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128123>

    We are doing this because -> "We do this because" or "This is done because"



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128125>

    previous -> outstanding



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128126>

    was not able -> is unable
    before timeout -> before the specified timeout



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128134>

    If timeout > 0, this method blocks as it tries to join the sender thread 
within the specified timeout.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128135>

    ==
    
    Also, is this completely true? It seems we may join (albeit without trying 
to send anything further) if called from a non-sender thread.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128138>

    We do this because the sender thread would otherwise try to join itself and 
block forever.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128142>

    "When" -> "If an"
    
    That said, this doc is a bit weird - if that is what the user is supposed 
to do then why can't this method take care of it (i.e., let the interrupted 
exception go)? It seems the right thing to do would be to just propagate and 
let the caller decide what to do.



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128144>

    "with timeout = {} ms"



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128141>

    elegantly -> gracefully



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128147>

    ```log.warn("Overriding close timeout {} ms to 0 ms in order to prevent 
deadlock due to self-join. This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.", timeout)```



clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
<https://reviews.apache.org/r/31850/#comment128154>

    May want to log that we are proceeding to a force close



clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
<https://reviews.apache.org/r/31850/#comment128155>

    until timeout is expired -> within the specified timeout. If the close does 
not complete within the timeout, discard any pending messages and force close 
the producer.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128166>

    As you explained offline, the sender does not have access to record batches 
while requests are in flight, but it would be super if we can figure out a way 
to avoid leaking details of batch completion (which is currently exclusively in 
sender) into the RecordAccumulator.



clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
<https://reviews.apache.org/r/31850/#comment128200>

    I don't think we should overload InterruptException for this. 
InterruptException is a wrapper around InterruptedException. i.e., after an 
InterruptException the thread should in fact have been interrupted - i.e., the 
interrupt status of the thread should be true (which is not the case here).



clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/31850/#comment128167>

    See comment above.



clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
<https://reviews.apache.org/r/31850/#comment128169>

    Can you revert this? i.e., I think the previous version with locally 
declared accums is cleaner.



core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
<https://reviews.apache.org/r/31850/#comment128171>

    Can you also add a test for calling close with a non-zero timeout in the 
callback?


- Joel Koshy


On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31850/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 11:35 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1660
>     https://issues.apache.org/jira/browse/KAFKA-1660
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> A minor fix.
> 
> 
> Incorporated Guozhang's comments.
> 
> 
> Modify according to the latest conclusion.
> 
> 
> Patch for the finally passed KIP-15git status
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> ab263423ff1d33170effb71acdef3fc501fa072a 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 6913090af03a455452b0b5c3df78f266126b3854 
>   clients/src/main/java/org/apache/kafka/clients/producer/Producer.java 
> 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 
>   
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java 
> fee322fa0dd9704374db4a6964246a7d2918d3e4 
>   clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
> c2fdc23239bd2196cd912c3d121b591f21393eab 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  e379ac89c9a2fbfe750d6b0dec693b7eabb76204 
>   core/src/test/scala/integration/kafka/api/ProducerSendTest.scala 
> 3df450784592b894008e7507b2737f9bb07f7bd2 
> 
> Diff: https://reviews.apache.org/r/31850/diff/
> 
> 
> Testing
> -------
> 
> Unit tests passed.
> 
> 
> Thanks,
> 
> Jiangjie Qin
> 
>

Reply via email to