-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/30763/#review73665
-----------------------------------------------------------
Hi Jay, I applied the patch and tried to run it in our test environment. I got
this exception:
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$KeyIterator.next(HashMap.java:1453)
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at java.util.HashSet.<init>(HashSet.java:119)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.awaitFlushCompletion(RecordAccumulator.java:348)
at
org.apache.kafka.clients.producer.KafkaProducer.flush(KafkaProducer.java:498)
at
kafka.tools.MirrorMaker$MirrorMakerProducer.flush(MirrorMaker.scala:373)
at kafka.tools.MirrorMaker$MirrorMakerThread.run(MirrorMaker.scala:314)
It looks that we should synchronize on the access to incomplete. i.e. when one
thread is making a copy of imcomplete set, other thread should not add batches
into it.
- Jiangjie Qin
On Feb. 23, 2015, 12:26 a.m., Jay Kreps wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/30763/
> -----------------------------------------------------------
>
> (Updated Feb. 23, 2015, 12:26 a.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1865
> https://issues.apache.org/jira/browse/KAFKA-1865
>
>
> Repository: kafka
>
>
> Description
> -------
>
> KAFKA-1865 Add a flush() method to the producer.
>
>
> Diffs
> -----
>
> clients/src/main/java/org/apache/kafka/clients/Metadata.java
> e8afecda956303a6ee116499fd443a54c018e17d
> clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
> 1fd6917c8a5131254c740abad7f7228a47e3628c
> clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
> 84530f2b948f9abd74203db48707e490dd9c81a5
> clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
> 17fe541588d462c68c33f6209717cc4015e9b62f
> clients/src/main/java/org/apache/kafka/clients/producer/ProducerRecord.java
> 4990692efa6f01c62e1d7b05fbf31bec50e398c9
>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
> 4a2da41f47994f778109e3c4107ffd90195f0bae
>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
> ecfe2144d778a5d9b614df5278b9f0a15637f10b
>
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
> dd0af8aee98abed5d4a0dc50989e37888bb353fe
>
> clients/src/main/java/org/apache/kafka/common/errors/InterruptException.java
> PRE-CREATION
> clients/src/main/java/org/apache/kafka/common/utils/SystemTime.java
> d682bd46ec3826f0a72388cc4ec30e1b1223d0f3
> clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java
> 4ae43ed47e31ad8052b4348a731da11120968508
> clients/src/test/java/org/apache/kafka/clients/producer/MetadataTest.java
> 743aa7e523dd476949f484bfa4c7fb8a3afd7bf8
>
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
> 75513b0bdd439329c5771d87436ef83fda853bfb
>
> clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java
> 83338633717cfa4ef7cf2a590b5aa6b9c8cb1dd2
> core/src/test/scala/integration/kafka/api/ConsumerTest.scala
> 2802a399bf599e9530f53b7df72f12702a10d3c4
> core/src/test/scala/integration/kafka/api/ProducerSendTest.scala
> b15237b76def3b234924280fa3fdb25dbb0cc0dc
> core/src/test/scala/unit/kafka/utils/TestUtils.scala
> 21d0ed2cb7c9459261d3cdc7c21dece5e2079698
>
> Diff: https://reviews.apache.org/r/30763/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Jay Kreps
>
>