Repository: kafka Updated Branches: refs/heads/trunk a74688de4 -> 5b42b538e
KAFKA-2047; Move the stream creation into concurrent mirror maker threads; reviewed by Guozhang Wang Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b42b538 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b42b538 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b42b538 Branch: refs/heads/trunk Commit: 5b42b538eb46203f7fd308cb3d3f27dde98840b8 Parents: a74688d Author: Jiangjie Qin <[email protected]> Authored: Wed Mar 25 14:01:19 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Mar 25 14:01:19 2015 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/MirrorMaker.scala | 49 ++++++++------------ 1 file changed, 19 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5b42b538/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 4f3c4c8..ec07743 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -22,18 +22,18 @@ import java.util.concurrent.CountDownLatch import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import java.util.{Collections, Properties} -import scala.collection.JavaConversions._ - import com.yammer.metrics.core.Gauge import joptsimple.OptionParser -import kafka.consumer.{Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, KafkaStream, Whitelist, ZookeeperConsumerConnector} +import kafka.consumer.{KafkaStream, Blacklist, ConsumerConfig, ConsumerThreadId, ConsumerTimeoutException, TopicFilter, Whitelist, ZookeeperConsumerConnector} import kafka.javaapi.consumer.ConsumerRebalanceListener import kafka.message.MessageAndMetadata import kafka.metrics.KafkaMetricsGroup import kafka.serializer.DefaultDecoder import kafka.utils.{CommandLineUtils, Logging, Utils} import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback -import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord, RecordMetadata} + +import scala.collection.JavaConversions._ /** * The mirror maker has the following architecture: @@ -226,26 +226,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { else new Blacklist(options.valueOf(blacklistOpt)) - // create a (connector->stream) sequence - val connectorStream = (0 until numStreams) map { - i => { - var stream: Seq[KafkaStream[Array[Byte], Array[Byte]]] = null - try { - // Creating just on stream per each connector instance - stream = connectors(i).createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) - require(stream.size == 1) - } catch { - case t: Throwable => - fatal("Unable to create stream - shutting down mirror maker.", t) - connectors(i).shutdown() - } - connectors(i) -> stream(0) - } - } - // Create mirror maker threads mirrorMakerThreads = (0 until numStreams) map ( i => - new MirrorMakerThread(connectorStream(i)._1, connectorStream(i)._2, i) + new MirrorMakerThread(connectors(i), filterSpec, i) ) // Create and initialize message handler @@ -295,13 +278,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } private def maybeSetDefaultProperty(properties: Properties, propertyName: String, defaultValue: String) { - properties.setProperty(propertyName, Option(properties.getProperty(propertyName)).getOrElse(defaultValue)) + val propertyValue = properties.getProperty(propertyName) + properties.setProperty(propertyName, Option(propertyValue).getOrElse(defaultValue)) if (properties.getProperty(propertyName) != defaultValue) - info("Property %s is overridden to %s - data loss or message reordering is possible.") + info("Property %s is overridden to %s - data loss or message reordering is possible.".format(propertyName, propertyValue)) } class MirrorMakerThread(connector: ZookeeperConsumerConnector, - stream: KafkaStream[Array[Byte], Array[Byte]], + filterSpec: TopicFilter, val threadId: Int) extends Thread with Logging with KafkaMetricsGroup { private val threadName = "mirrormaker-thread-" + threadId private val shutdownLatch: CountDownLatch = new CountDownLatch(1) @@ -313,8 +297,13 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { override def run() { info("Starting mirror maker thread " + threadName) - val iter = stream.iterator() try { + // Creating one stream per each connector instance + val streams = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()) + require(streams.size == 1) + val stream = streams(0) + val iter = stream.iterator() + // TODO: Need to be changed after KAFKA-1660 is available. while (!exitingOnSendFailure && !shuttingDown) { try { @@ -333,10 +322,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } } catch { case t: Throwable => - fatal("Producer thread failure due to ", t) + fatal("Mirror maker thread failure due to ", t) } finally { shutdownLatch.countDown() - info("Producer thread stopped") + info("Mirror maker thread stopped") // if it exits accidentally, stop the entire mirror maker if (!isShuttingdown.get()) { fatal("Mirror maker thread exited abnormally, stopping the whole mirror maker.") @@ -360,7 +349,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { } catch { case ie: InterruptedException => - warn("Interrupt during shutdown of ProducerThread") + warn("Interrupt during shutdown of the mirror maker thread") } } @@ -370,7 +359,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { info("Mirror maker thread shutdown complete") } catch { case ie: InterruptedException => - warn("Shutdown of the producer thread interrupted") + warn("Shutdown of the mirror maker thread interrupted") } } }
