----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/31850/#review79063 -----------------------------------------------------------
Thanks for the patch. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128121> [NOTE] is not a standard javadoc highlight is it? (I don't know.) If not, can you just use the standard <strong> for emphasis? clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128123> We are doing this because -> "We do this because" or "This is done because" clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128125> previous -> outstanding clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128126> was not able -> is unable before timeout -> before the specified timeout clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128134> If timeout > 0, this method blocks as it tries to join the sender thread within the specified timeout. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128135> == Also, is this completely true? It seems we may join (albeit without trying to send anything further) if called from a non-sender thread. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128138> We do this because the sender thread would otherwise try to join itself and block forever. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128142> "When" -> "If an" That said, this doc is a bit weird - if that is what the user is supposed to do then why can't this method take care of it (i.e., let the interrupted exception go)? It seems the right thing to do would be to just propagate and let the caller decide what to do. clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128144> "with timeout = {} ms" clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128141> elegantly -> gracefully clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128147> ```log.warn("Overriding close timeout {} ms to 0 ms in order to prevent deadlock due to self-join. This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout)``` clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java <https://reviews.apache.org/r/31850/#comment128154> May want to log that we are proceeding to a force close clients/src/main/java/org/apache/kafka/clients/producer/Producer.java <https://reviews.apache.org/r/31850/#comment128155> until timeout is expired -> within the specified timeout. If the close does not complete within the timeout, discard any pending messages and force close the producer. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31850/#comment128166> As you explained offline, the sender does not have access to record batches while requests are in flight, but it would be super if we can figure out a way to avoid leaking details of batch completion (which is currently exclusively in sender) into the RecordAccumulator. clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java <https://reviews.apache.org/r/31850/#comment128200> I don't think we should overload InterruptException for this. InterruptException is a wrapper around InterruptedException. i.e., after an InterruptException the thread should in fact have been interrupted - i.e., the interrupt status of the thread should be true (which is not the case here). clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java <https://reviews.apache.org/r/31850/#comment128167> See comment above. clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java <https://reviews.apache.org/r/31850/#comment128169> Can you revert this? i.e., I think the previous version with locally declared accums is cleaner. core/src/test/scala/integration/kafka/api/ProducerSendTest.scala <https://reviews.apache.org/r/31850/#comment128171> Can you also add a test for calling close with a non-zero timeout in the callback? - Joel Koshy On March 27, 2015, 11:35 p.m., Jiangjie Qin wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/31850/ > ----------------------------------------------------------- > > (Updated March 27, 2015, 11:35 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1660 > https://issues.apache.org/jira/browse/KAFKA-1660 > > > Repository: kafka > > > Description > ------- > > A minor fix. > > > Incorporated Guozhang's comments. > > > Modify according to the latest conclusion. > > > Patch for the finally passed KIP-15git status > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > ab263423ff1d33170effb71acdef3fc501fa072a > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 6913090af03a455452b0b5c3df78f266126b3854 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 5b3e75ed83a940bc922f9eca10d4008d67aa37c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > 88b4e4fbf3bf6fb6d2f90551a792b95d4cd51c40 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > 70954cadb5c7e9a4c326afcf9d9a07db230e7db2 > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > fee322fa0dd9704374db4a6964246a7d2918d3e4 > clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java > c2fdc23239bd2196cd912c3d121b591f21393eab > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > e379ac89c9a2fbfe750d6b0dec693b7eabb76204 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > 3df450784592b894008e7507b2737f9bb07f7bd2 > > Diff: https://reviews.apache.org/r/31850/diff/ > > > Testing > ------- > > Unit tests passed. > > > Thanks, > > Jiangjie Qin > >
