KAFKA-957; MirrorMaker needs to preserve ordering for keyed messages from source cluster; patched by Guozhang Wang, reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5cf6a546 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5cf6a546 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5cf6a546 Branch: refs/heads/trunk Commit: 5cf6a546649c213c33ef41d5d47e6af959c0c587 Parents: 401d591 Author: Joel Koshy <[email protected]> Authored: Wed Jul 24 10:28:26 2013 -0700 Committer: Joel Koshy <[email protected]> Committed: Wed Jul 24 11:41:22 2013 -0700 ---------------------------------------------------------------------- .../kafka/producer/ByteArrayPartitioner.scala | 27 ++++++++++++++++++ .../main/scala/kafka/tools/MirrorMaker.scala | 30 ++++++++++++++++---- 2 files changed, 52 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf6a546/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala new file mode 100644 index 0000000..752a4fc --- /dev/null +++ b/core/src/main/scala/kafka/producer/ByteArrayPartitioner.scala @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.producer + + +import kafka.utils._ + +private class ByteArrayPartitioner(props: VerifiableProperties = null) extends Partitioner[Array[Byte]] { + def partition(key: Array[Byte], numPartitions: Int): Int = { + Utils.abs(java.util.Arrays.hashCode(key)) % numPartitions + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5cf6a546/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 2d93947..a85bfa2 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -22,6 +22,7 @@ import kafka.utils.{Utils, CommandLineUtils, Logging} import kafka.producer.{KeyedMessage, ProducerConfig, Producer} import scala.collection.JavaConversions._ import java.util.concurrent.CountDownLatch +import java.nio.ByteBuffer import kafka.consumer._ import kafka.serializer._ import collection.mutable.ListBuffer @@ -99,8 +100,15 @@ object MirrorMaker extends Logging { val bufferSize = options.valueOf(bufferSizeOpt).intValue() val producers = (1 to options.valueOf(numProducersOpt).intValue()).map(_ => { - val config = new ProducerConfig( - Utils.loadProps(options.valueOf(producerConfigOpt))) + val props = Utils.loadProps(options.valueOf(producerConfigOpt)) + val config = props.getProperty("partitioner.class") match { + case null => + new ProducerConfig(props) { + override val partitionerClass = "kafka.producer.ByteArrayPartitioner" + } + case pClass : String => + new ProducerConfig(props) + } new Producer[Array[Byte], Array[Byte]](config) }) @@ -125,7 +133,7 @@ object MirrorMaker extends Logging { val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); val consumerThreads = - streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2)) + streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, producers, streamAndIndex._2)) val producerThreads = new ListBuffer[ProducerThread]() @@ -162,6 +170,7 @@ object MirrorMaker extends Logging { class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], producerDataChannel: ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]], + producers: Seq[Producer[Array[Byte], Array[Byte]]], threadId: Int) extends Thread with Logging { @@ -174,8 +183,19 @@ object MirrorMaker extends Logging { info("Starting mirror maker thread " + threadName) try { for (msgAndMetadata <- stream) { - val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) - producerDataChannel.sendRequest(pd) + // If the key of the message is empty, put it into the universal channel + // Otherwise use a pre-assigned producer to send the message + if (msgAndMetadata.key == null) { + trace("Send the non-keyed message the producer channel.") + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message) + producerDataChannel.sendRequest(pd) + } else { + val producerId = Utils.abs(java.util.Arrays.hashCode(msgAndMetadata.key)) % producers.size() + trace("Send message with key %s to producer %d.".format(java.util.Arrays.toString(msgAndMetadata.key), producerId)) + val producer = producers(producerId) + val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.key, msgAndMetadata.message) + producer.send(pd) + } } } catch { case e =>
