> On Feb. 17, 2015, 9:28 p.m., Jiangjie Qin wrote: > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java, > > line 80 > > <https://reviews.apache.org/r/30763/diff/1/?file=853738#file853738line80> > > > > I think we have to execute the callback before we wake up the caller > > thread. Otherwise if something went wrong in this batch, caller thread > > might not be aware of that before it's waken up and put a bunch of other > > stuff into producer, or commit offsets. > > For example, > > In mirror maker: > > ... > > for (rec <- recs) > > producer.send(rec1); > > producer.flush(); > > consumer.commitOffsets(); > > ... > > The caller thread could have already committed offsets even if > > something went wrong in callback.
This is actually a good point. However we also want to have the same semantics as calling .get() on all the metadata futures. Currently we first unblock the future then execute the callback, I am going to reverse that ordering. So now the future isn't unblocked until all callbacks have been executed and this also gives us the property you described for flush (since flush is equivalent ot calling get()). - Jay ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/30763/#review72794 ----------------------------------------------------------- On Feb. 26, 2015, 6:37 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/30763/ > ----------------------------------------------------------- > > (Updated Feb. 26, 2015, 6:37 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1865 > https://issues.apache.org/jira/browse/KAFKA-1865 > > > Repository: kafka > > > Description > ------- > > KAFKA-1865 Add a flush() method to the producer. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/Metadata.java > e8afecda956303a6ee116499fd443a54c018e17d > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 1fd6917c8a5131254c740abad7f7228a47e3628c > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 84530f2b948f9abd74203db48707e490dd9c81a5 > clients/src/main/java/org/apache/kafka/clients/producer/Producer.java > 17fe541588d462c68c33f6209717cc4015e9b62f > clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java > 4990692efa6f01c62e1d7b05fbf31bec50e398c9 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java > 4a2da41f47994f778109e3c4107ffd90195f0bae > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > ecfe2144d778a5d9b614df5278b9f0a15637f10b > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > dd0af8aee98abed5d4a0dc50989e37888bb353fe > > clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java > d682bd46ec3826f0a72388cc4ec30e1b1223d0f3 > clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java > 4ae43ed47e31ad8052b4348a731da11120968508 > clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java > 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8 > > clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java > 75513b0bdd439329c5771d87436ef83fda853bfb > > clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java > 404bedb3dc4e44cc79251d71e1e3f8efdab60efa > > clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java > 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2 > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 558942aaecd1b9f7098435d39aa4b362cd16ff0a > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > 2802a399bf599e9530f53b7df72f12702a10d3c4 > core/src/test/scala/integration/kafka/api/ProducerSendTest.scala > b15237b76def3b234924280fa3fdb25dbb0cc0dc > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 21d0ed2cb7c9459261d3cdc7c21dece5e2079698 > > Diff: https://reviews.apache.org/r/30763/diff/ > > > Testing > ------- > > The latest patch uses Jiangjie's suggestion to remove the synchronization on > flush. > > > Thanks, > > Jay Kreps > >