Repository: incubator-samza Updated Branches: refs/heads/master 429c1edb3 -> 87f19fcd0
SAMZA-203; fix changelog restore performance by increasing flushThreshold in kafka system consumer Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/87f19fcd Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/87f19fcd Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/87f19fcd Branch: refs/heads/master Commit: 87f19fcd0ba38124fd231ac314d056180643931b Parents: 429c1ed Author: Chris Riccomini <[email protected]> Authored: Tue Mar 25 09:19:14 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Mar 25 09:19:14 2014 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/KafkaConfig.scala | 2 +- .../apache/samza/system/kafka/BrokerProxy.scala | 70 +++++++++++--------- .../system/kafka/KafkaSystemConsumer.scala | 43 +++++++++--- .../samza/system/kafka/KafkaSystemFactory.scala | 4 +- .../system/kafka/TestKafkaSystemConsumer.scala | 19 ++++++ 5 files changed, 92 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 978620a..4deabd3 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -40,7 +40,7 @@ object KafkaConfig { * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. */ - val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold" + val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold" implicit def Config2Kafka(config: Config) = new KafkaConfig(config) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index bca2f86..88817ef 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -38,7 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy * Companion object for class JvmMetrics encapsulating various constants */ object BrokerProxy { - val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY" + val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-" } /** @@ -62,7 +62,7 @@ class BrokerProxy( /** * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview */ - val sleepMSWhileNoTopicPartitions = 1000 + val sleepMSWhileNoTopicPartitions = 100 /** What's the next offset for a particular partition? **/ val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() @@ -125,36 +125,42 @@ class BrokerProxy( val thread = new Thread(new Runnable { def run { var reconnect = false - (new ExponentialSleepStrategy).run( - loop => { - if (reconnect) { - metrics.reconnects(host, port).inc - simpleConsumer.close() - simpleConsumer = createSimpleConsumer() - } - - while (!Thread.currentThread.isInterrupted) { - if (nextOffsets.size == 0) { - debug("No TopicPartitions to fetch. Sleeping.") - Thread.sleep(sleepMSWhileNoTopicPartitions) - } else { - fetchMessages - - // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. - // In that case, reset the loop delay, so that the next time an error occurs, - // we start with a short retry delay. - loop.reset + + try { + (new ExponentialSleepStrategy).run( + loop => { + if (reconnect) { + metrics.reconnects(host, port).inc + simpleConsumer.close() + simpleConsumer = createSimpleConsumer() + } + + while (!Thread.currentThread.isInterrupted) { + if (nextOffsets.size == 0) { + debug("No TopicPartitions to fetch. Sleeping.") + Thread.sleep(sleepMSWhileNoTopicPartitions) + } else { + fetchMessages + + // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. + // In that case, reset the loop delay, so that the next time an error occurs, + // we start with a short retry delay. + loop.reset + } } - } - }, - - (exception, loop) => { - warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) - debug(exception) - reconnect = true - } - ) - if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt") + }, + + (exception, loop) => { + warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) + debug(exception) + reconnect = true + }) + } catch { + case e: InterruptedException => info("Got interrupt exception in broker proxy thread.") + case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.") + } + + if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.") } }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) @@ -263,7 +269,7 @@ class BrokerProxy( info("Starting " + toString) thread.setDaemon(true) - thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX) + thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName) thread.start } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 8ad97df..e1ea2ff 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -34,6 +34,7 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import kafka.consumer.ConsumerConfig import org.apache.samza.util.ExponentialSleepStrategy +import org.apache.samza.SamzaException object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -54,10 +55,24 @@ private[kafka] class KafkaSystemConsumer( clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, timeout: Int = ConsumerConfig.ConsumerTimeoutMs, bufferSize: Int = ConsumerConfig.SocketBufferSize, - fetchSize:Int = ConsumerConfig.MaxFetchSize, - consumerMinSize:Int = ConsumerConfig.MinFetchBytes, - consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, - fetchThreshold: Int = 0, + fetchSize: Int = ConsumerConfig.MaxFetchSize, + consumerMinSize: Int = ConsumerConfig.MinFetchBytes, + consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs, + + /** + * Defines a low water mark for how many messages we buffer before we start + * executing fetch requests against brokers to get more messages. This value + * is divided equally among all registered SystemStreamPartitions. For + * example, if fetchThreshold is set to 50000, and there are 50 + * SystemStreamPartitions registered, then the per-partition threshold is + * 1000. As soon as a SystemStreamPartition's buffered message count drops + * below 1000, a fetch request will be executed to get more data for it. + * + * Increasing this parameter will decrease the latency between when a queue + * is drained of messages and when new messages are enqueued, but also leads + * to an increase in memory usage since more messages will be held in memory. + */ + fetchThreshold: Int = 50000, offsetGetter: GetOffset = new GetOffset("fail"), deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], @@ -69,8 +84,15 @@ private[kafka] class KafkaSystemConsumer( type HostPort = (String, Int) val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]() var nextOffsets = Map[SystemStreamPartition, String]() + var perPartitionFetchThreshold = fetchThreshold def start() { + if (nextOffsets.size <= 0) { + throw new SamzaException("No SystemStreamPartitions registered. Must register at least one SystemStreamPartition before starting the consumer.") + } + + perPartitionFetchThreshold = fetchThreshold / nextOffsets.size + val topicPartitionsAndOffsets = nextOffsets.map { case (systemStreamPartition, offset) => val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) @@ -106,16 +128,16 @@ private[kafka] class KafkaSystemConsumer( // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions. // This avoids trying to re-add the same topic partition repeatedly - def refresh(tp:List[TopicAndPartition]) = { + def refresh(tp: List[TopicAndPartition]) = { val head :: rest = tpToRefresh val nextOffset = topicPartitionsAndOffsets.get(head).get // Whatever we do, we can't say Broker, even though we're // manipulating it here. Broker is a private type and Scala doesn't seem // to care about that as long as you don't explicitly declare its type. val brokerOption = partitionMetadata(head.topic) - .partitionsMetadata - .find(_.partitionId == head.partition) - .flatMap(_.leader) + .partitionsMetadata + .find(_.partitionId == head.partition) + .flatMap(_.leader) brokerOption match { case Some(broker) => @@ -138,8 +160,7 @@ private[kafka] class KafkaSystemConsumer( (loop, exception) => { warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception)) debug(exception) - } - ) + }) } val sink = new MessageSink { @@ -148,7 +169,7 @@ private[kafka] class KafkaSystemConsumer( } def needsMoreMessages(tp: TopicAndPartition) = { - getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold + getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold } def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index feecc58..d6e3a52 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -47,9 +47,9 @@ class KafkaSystemFactory extends SystemFactory { val consumerMaxWait = consumerConfig.fetchWaitMaxMs val autoOffsetResetDefault = consumerConfig.autoOffsetReset val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) - val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt + val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) - + new KafkaSystemConsumer( systemName = systemName, brokerListString = brokerListString, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/87f19fcd/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala new file mode 100644 index 0000000..8bd51a1 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala @@ -0,0 +1,19 @@ +package org.apache.samza.system.kafka + +import org.junit.Test +import org.junit.Assert._ +import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.Partition + +class TestKafkaSystemConsumer { + @Test + def testFetchThresholdShouldDivideEvenlyAmongPartitions { + val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000) + + for (i <- 0 until 50) { + consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") + } + + assertEquals(1000, consumer.perPartitionFetchThreshold) + } +} \ No newline at end of file
