Repository: incubator-samza Updated Branches: refs/heads/master 87f19fcd0 -> a62107786
Test fails with this patch. Reverting to fix. Revert "SAMZA-203; fix changelog restore performance by increasing flushThreshold in kafka system consumer" This reverts commit 87f19fcd0ba38124fd231ac314d056180643931b. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a6210778 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a6210778 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a6210778 Branch: refs/heads/master Commit: a6210778617c1cf5e23fbc2338cc01a5f82e465f Parents: 87f19fc Author: Chris Riccomini <[email protected]> Authored: Tue Mar 25 09:24:27 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Tue Mar 25 09:24:27 2014 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/KafkaConfig.scala | 2 +- .../apache/samza/system/kafka/BrokerProxy.scala | 70 +++++++++----------- .../system/kafka/KafkaSystemConsumer.scala | 43 +++--------- .../samza/system/kafka/KafkaSystemFactory.scala | 4 +- .../system/kafka/TestKafkaSystemConsumer.scala | 19 ------ 5 files changed, 46 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 4deabd3..978620a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -40,7 +40,7 @@ object KafkaConfig { * Defines how low a queue can get for a single system/stream/partition * combination before trying to fetch more messages for it. */ - val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold" + val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold" implicit def Config2Kafka(config: Config) = new KafkaConfig(config) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 88817ef..bca2f86 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -38,7 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy * Companion object for class JvmMetrics encapsulating various constants */ object BrokerProxy { - val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-" + val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY" } /** @@ -62,7 +62,7 @@ class BrokerProxy( /** * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview */ - val sleepMSWhileNoTopicPartitions = 100 + val sleepMSWhileNoTopicPartitions = 1000 /** What's the next offset for a particular partition? **/ val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]() @@ -125,42 +125,36 @@ class BrokerProxy( val thread = new Thread(new Runnable { def run { var reconnect = false - - try { - (new ExponentialSleepStrategy).run( - loop => { - if (reconnect) { - metrics.reconnects(host, port).inc - simpleConsumer.close() - simpleConsumer = createSimpleConsumer() - } - - while (!Thread.currentThread.isInterrupted) { - if (nextOffsets.size == 0) { - debug("No TopicPartitions to fetch. Sleeping.") - Thread.sleep(sleepMSWhileNoTopicPartitions) - } else { - fetchMessages - - // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. - // In that case, reset the loop delay, so that the next time an error occurs, - // we start with a short retry delay. - loop.reset - } + (new ExponentialSleepStrategy).run( + loop => { + if (reconnect) { + metrics.reconnects(host, port).inc + simpleConsumer.close() + simpleConsumer = createSimpleConsumer() + } + + while (!Thread.currentThread.isInterrupted) { + if (nextOffsets.size == 0) { + debug("No TopicPartitions to fetch. Sleeping.") + Thread.sleep(sleepMSWhileNoTopicPartitions) + } else { + fetchMessages + + // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. + // In that case, reset the loop delay, so that the next time an error occurs, + // we start with a short retry delay. + loop.reset } - }, - - (exception, loop) => { - warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) - debug(exception) - reconnect = true - }) - } catch { - case e: InterruptedException => info("Got interrupt exception in broker proxy thread.") - case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.") - } - - if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.") + } + }, + + (exception, loop) => { + warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception) + debug(exception) + reconnect = true + } + ) + if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt") } }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) @@ -269,7 +263,7 @@ class BrokerProxy( info("Starting " + toString) thread.setDaemon(true) - thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName) + thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX) thread.start } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index e1ea2ff..8ad97df 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -34,7 +34,6 @@ import org.apache.samza.system.SystemStreamPartition import org.apache.samza.system.IncomingMessageEnvelope import kafka.consumer.ConsumerConfig import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.SamzaException object KafkaSystemConsumer { def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { @@ -55,24 +54,10 @@ private[kafka] class KafkaSystemConsumer( clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, timeout: Int = ConsumerConfig.ConsumerTimeoutMs, bufferSize: Int = ConsumerConfig.SocketBufferSize, - fetchSize: Int = ConsumerConfig.MaxFetchSize, - consumerMinSize: Int = ConsumerConfig.MinFetchBytes, - consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs, - - /** - * Defines a low water mark for how many messages we buffer before we start - * executing fetch requests against brokers to get more messages. This value - * is divided equally among all registered SystemStreamPartitions. For - * example, if fetchThreshold is set to 50000, and there are 50 - * SystemStreamPartitions registered, then the per-partition threshold is - * 1000. As soon as a SystemStreamPartition's buffered message count drops - * below 1000, a fetch request will be executed to get more data for it. - * - * Increasing this parameter will decrease the latency between when a queue - * is drained of messages and when new messages are enqueued, but also leads - * to an increase in memory usage since more messages will be held in memory. - */ - fetchThreshold: Int = 50000, + fetchSize:Int = ConsumerConfig.MaxFetchSize, + consumerMinSize:Int = ConsumerConfig.MinFetchBytes, + consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, + fetchThreshold: Int = 0, offsetGetter: GetOffset = new GetOffset("fail"), deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], @@ -84,15 +69,8 @@ private[kafka] class KafkaSystemConsumer( type HostPort = (String, Int) val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]() var nextOffsets = Map[SystemStreamPartition, String]() - var perPartitionFetchThreshold = fetchThreshold def start() { - if (nextOffsets.size <= 0) { - throw new SamzaException("No SystemStreamPartitions registered. Must register at least one SystemStreamPartition before starting the consumer.") - } - - perPartitionFetchThreshold = fetchThreshold / nextOffsets.size - val topicPartitionsAndOffsets = nextOffsets.map { case (systemStreamPartition, offset) => val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) @@ -128,16 +106,16 @@ private[kafka] class KafkaSystemConsumer( // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions. // This avoids trying to re-add the same topic partition repeatedly - def refresh(tp: List[TopicAndPartition]) = { + def refresh(tp:List[TopicAndPartition]) = { val head :: rest = tpToRefresh val nextOffset = topicPartitionsAndOffsets.get(head).get // Whatever we do, we can't say Broker, even though we're // manipulating it here. Broker is a private type and Scala doesn't seem // to care about that as long as you don't explicitly declare its type. val brokerOption = partitionMetadata(head.topic) - .partitionsMetadata - .find(_.partitionId == head.partition) - .flatMap(_.leader) + .partitionsMetadata + .find(_.partitionId == head.partition) + .flatMap(_.leader) brokerOption match { case Some(broker) => @@ -160,7 +138,8 @@ private[kafka] class KafkaSystemConsumer( (loop, exception) => { warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception)) debug(exception) - }) + } + ) } val sink = new MessageSink { @@ -169,7 +148,7 @@ private[kafka] class KafkaSystemConsumer( } def needsMoreMessages(tp: TopicAndPartition) = { - getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold + getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold } def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index d6e3a52..feecc58 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -47,9 +47,9 @@ class KafkaSystemFactory extends SystemFactory { val consumerMaxWait = consumerConfig.fetchWaitMaxMs val autoOffsetResetDefault = consumerConfig.autoOffsetReset val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName) - val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt + val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics) - + new KafkaSystemConsumer( systemName = systemName, brokerListString = brokerListString, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala deleted file mode 100644 index 8bd51a1..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ /dev/null @@ -1,19 +0,0 @@ -package org.apache.samza.system.kafka - -import org.junit.Test -import org.junit.Assert._ -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.Partition - -class TestKafkaSystemConsumer { - @Test - def testFetchThresholdShouldDivideEvenlyAmongPartitions { - val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000) - - for (i <- 0 until 50) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - assertEquals(1000, consumer.perPartitionFetchThreshold) - } -} \ No newline at end of file
