Updated Branches: refs/heads/0.8 dcbf0bf0b -> 93921a3a5
KAFKA-1071; The random partition selected in DefaultEventHandler is not random across producer instances; reviewed by Neha Narkhede and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/93921a3a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/93921a3a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/93921a3a Branch: refs/heads/0.8 Commit: 93921a3a5720a1ffd9e272d59d8a7627da28e89e Parents: dcbf0bf Author: Guozhang Wang <[email protected]> Authored: Thu Oct 3 17:46:53 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Oct 3 17:46:53 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/producer/async/DefaultEventHandler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/93921a3a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala index c151032..eba375b 100644 --- a/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala +++ b/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala @@ -22,6 +22,7 @@ import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet} import kafka.producer._ import kafka.serializer.Encoder import kafka.utils.{Utils, Logging, SystemTime} +import scala.util.Random import scala.collection.{Seq, Map} import scala.collection.mutable.{ArrayBuffer, HashMap, Set} import java.util.concurrent.atomic._ @@ -36,7 +37,6 @@ class DefaultEventHandler[K,V](config: ProducerConfig, extends EventHandler[K,V] with Logging { val isSync = ("sync" == config.producerType) - val partitionCounter = new AtomicInteger(0) val correlationId = new AtomicInteger(0) val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos) @@ -217,7 +217,7 @@ class DefaultEventHandler[K,V](config: ProducerConfig, val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined) if (availablePartitions.isEmpty) throw new LeaderNotAvailableException("No leader for any partition in topic " + topic) - val index = Utils.abs(partitionCounter.getAndIncrement()) % availablePartitions.size + val index = Utils.abs(Random.nextInt) % availablePartitions.size val partitionId = availablePartitions(index).partitionId sendPartitionPerTopicCache.put(topic, partitionId) partitionId
