added test
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/89f79829 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/89f79829 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/89f79829 Branch: refs/heads/NewKafkaSystemConsumer Commit: 89f79829107ed21dd88058922b6038835af1cfbd Parents: 34ae8ba Author: Boris S <[email protected]> Authored: Thu Aug 30 10:30:55 2018 -0700 Committer: Boris S <[email protected]> Committed: Thu Aug 30 10:30:55 2018 -0700 ---------------------------------------------------------------------- .../clients/consumer/KafkaConsumerConfig.java | 22 ++ .../apache/samza/system/kafka/BrokerProxy.scala | 332 ------------------- .../samza/system/kafka/KafkaConsumerProxy.java | 6 +- .../system/kafka/KafkaSystemConsumer.scala | 309 ----------------- .../kafka/KafkaSystemConsumerMetrics.scala | 1 + .../system/kafka/NewKafkaSystemConsumer.java | 19 +- .../kafka/TestKafkaCheckpointManager.scala | 3 +- .../samza/system/kafka/TestBrokerProxy.scala | 3 + .../system/kafka/TestKafkaSystemConsumer.scala | 191 ----------- .../kafka/TestNewKafkaSystemConsumer.java | 203 ++++++++++++ 10 files changed, 237 insertions(+), 852 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java index b29a041..88437ee 100644 --- a/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java +++ b/samza-kafka/src/main/scala/org/apache/kafka/clients/consumer/KafkaConsumerConfig.java @@ -23,9 +23,14 @@ package org.apache.kafka.clients.consumer; import java.util.Map; import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.samza.SamzaException; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.JobConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.Option; @@ -34,6 +39,8 @@ import scala.Option; */ public class KafkaConsumerConfig extends ConsumerConfig { + public static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerConfig.class); + private static final String PRODUCER_CLIENT_ID_PREFIX = "kafka-producer"; private static final String CONSUMER_CLIENT_ID_PREFIX = "kafka-consumer"; private static final String SAMZA_OFFSET_LARGEST = "largest"; @@ -76,6 +83,9 @@ public class KafkaConsumerConfig extends ConsumerConfig { if (! subConf.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) { // get it from the producer config String bootstrapServer = config.get(String.format("systems.%s.producer.%s", systemName, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); + if (StringUtils.isEmpty(bootstrapServer)) { + throw new SamzaException("Missing " + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " config for " + systemName); + } consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); } @@ -85,6 +95,18 @@ public class KafkaConsumerConfig extends ConsumerConfig { RangeAssignor.class.getName()); + // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should + // default to byte[] + if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { + LOG.info("default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); + } + + // NOT SURE THIS IS NEEDED TODO String maxPollRecords = subConf.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, KAFKA_CONSUMER_MAX_POLL_RECORDS_DEFAULT);; consumerProps.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala deleted file mode 100644 index 423b68a..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ /dev/null @@ -1,332 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.samza.system.kafka - -import java.lang.Thread.UncaughtExceptionHandler -import java.nio.channels.ClosedByInterruptException -import java.util.concurrent.{ConcurrentHashMap, CountDownLatch} - -import kafka.api._ -import kafka.common.{ErrorMapping, NotLeaderForPartitionException, TopicAndPartition, UnknownTopicOrPartitionException} -import kafka.consumer.ConsumerConfig -import kafka.message.MessageSet -import org.apache.samza.SamzaException -import org.apache.samza.util.ExponentialSleepStrategy -import org.apache.samza.util.KafkaUtil -import org.apache.samza.util.Logging - -import scala.collection.JavaConverters._ -import scala.collection.concurrent - -/** - * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing - * a way for consumers to retrieve those messages by topic and partition. - */ -class BrokerProxy( - val host: String, - val port: Int, - val system: String, - val clientID: String, - val metrics: KafkaSystemConsumerMetrics, - val messageSink: MessageSink, - val timeout: Int = ConsumerConfig.SocketTimeout, - val bufferSize: Int = ConsumerConfig.SocketBufferSize, - val fetchSize: StreamFetchSizes = new StreamFetchSizes, - val consumerMinSize:Int = ConsumerConfig.MinFetchBytes, - val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, - offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging { - - /** - * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview - */ - val sleepMSWhileNoTopicPartitions = 100 - - /** What's the next offset for a particular partition? **/ - val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]().asScala - - /** Block on the first call to get message if the fetcher has not yet returned its initial results **/ - // TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but - // VisualVM was showing the consumer thread spending all its time in the await method rather than returning - // immediately, even though the process was proceeding normally. Hence the extra boolean. Should be investigated. - val firstCallBarrier = new CountDownLatch(1) - var firstCall = true - - var simpleConsumer = createSimpleConsumer() - - metrics.registerBrokerProxy(host, port) - - def createSimpleConsumer() = { - val hostString = "%s:%d" format (host, port) - info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system)) - - val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait) - sc - } - - def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { - debug("Adding new topic and partition %s to queue for %s" format (tp, host)) - - if (nextOffsets.asJava.containsKey(tp)) { - toss("Already consuming TopicPartition %s" format tp) - } - - val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) { - nextOffset - .get - .toLong - } else { - warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp)) - - offsetGetter.getResetOffset(simpleConsumer, tp) - } - - debug("Got offset %s for new topic and partition %s." format (offset, tp)) - - nextOffsets += tp -> offset - - metrics.topicPartitions.get((host, port)).set(nextOffsets.size) - } - - def removeTopicPartition(tp: TopicAndPartition) = { - if (nextOffsets.asJava.containsKey(tp)) { - val offset = nextOffsets.remove(tp) - metrics.topicPartitions.get((host, port)).set(nextOffsets.size) - debug("Removed %s" format tp) - offset - } else { - warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(","))) - None - } - } - - val thread = new Thread(new Runnable { - def run { - var reconnect = false - - try { - (new ExponentialSleepStrategy).run( - loop => { - if (reconnect) { - metrics.reconnects.get((host, port)).inc - simpleConsumer.close() - simpleConsumer = createSimpleConsumer() - } - - while (!Thread.currentThread.isInterrupted) { - messageSink.refreshDropped - if (nextOffsets.size == 0) { - debug("No TopicPartitions to fetch. Sleeping.") - Thread.sleep(sleepMSWhileNoTopicPartitions) - } else { - fetchMessages - - // If we got here, fetchMessages didn't throw an exception, i.e. it was successful. - // In that case, reset the loop delay, so that the next time an error occurs, - // we start with a short retry delay. - loop.reset - } - } - }, - - (exception, loop) => { - warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception) - debug("Exception detail:", exception) - abdicateAll - reconnect = true - }) - } catch { - case e: InterruptedException => info("Got interrupt exception in broker proxy thread.") - case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.") - case e: OutOfMemoryError => throw new SamzaException("Got out of memory error in broker proxy thread.") - case e: StackOverflowError => throw new SamzaException("Got stack overflow error in broker proxy thread.") - } - - if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.") - } - }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID)) - - private def fetchMessages(): Unit = { - val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList - - if (topicAndPartitionsToFetch.size > 0) { - metrics.brokerReads.get((host, port)).inc - val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*) - firstCall = false - firstCallBarrier.countDown() - - // Split response into errors and non errors, processing the errors first - val (nonErrorResponses, errorResponses) = response.data.toSet.partition(_._2.error.code() == ErrorMapping.NoError) - - handleErrors(errorResponses, response) - - nonErrorResponses.foreach { case (tp, data) => moveMessagesToTheirQueue(tp, data) } - } else { - refreshLatencyMetrics - - debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) - - metrics.brokerSkippedFetchRequests.get((host, port)).inc - - Thread.sleep(sleepMSWhileNoTopicPartitions) - } - } - - /** - * Releases ownership for a single TopicAndPartition. The - * KafkaSystemConsumer will try and find a new broker for the - * TopicAndPartition. - */ - def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match { - // Need to be mindful of a tp that was removed by another thread - case Some(offset) => messageSink.abdicate(tp, offset) - case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?") - } - - /** - * Releases all TopicAndPartition ownership for this BrokerProxy thread. The - * KafkaSystemConsumer will try and find a new broker for the - * TopicAndPartition. - */ - def abdicateAll { - info("Abdicating all topic partitions.") - val immutableNextOffsetsCopy = nextOffsets.toMap - immutableNextOffsetsCopy.keySet.foreach(abdicate(_)) - } - - def handleErrors(errorResponses: Set[(TopicAndPartition, FetchResponsePartitionData)], response: FetchResponse) = { - // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves - case class Error(tp: TopicAndPartition, code: Short, exception: Exception) - - // Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset) - - // Convert FetchResponse into easier-to-work-with Errors - val errors = for ( - (topicAndPartition, responseData) <- errorResponses; - error <- Option(response.error(topicAndPartition.topic, topicAndPartition.partition)) // Scala's being cranky about referring to error.getKey values... - ) yield new Error(topicAndPartition, error.code(), error.exception()) - - val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) => e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == ErrorMapping.UnknownTopicOrPartitionCode } - val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode) - - // Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset) - // However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other - // topic-partitions remains the same. That way, when we've rebuilt the simple consumer, we can come around and - // handle the recoverable errors. - remainingErrors.foreach(e => { - warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(",")) - KafkaUtil.maybeThrowException(e.exception) }) - - notLeaderOrUnknownTopic.foreach(e => { - warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp)) - abdicate(e.tp) - }) - - offsetOutOfRangeErrors.foreach(e => { - warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim"))) - - try { - val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp) - // Put the new offset into the map (if the tp still exists). Will catch it on the next go-around - nextOffsets.replace(e.tp, newOffset) - } catch { - // UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail. - case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception %s for %s. Abdicating" format(e.code, e.tp)) - abdicate(e.tp) - } - }) - } - - def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = { - val messageSet: MessageSet = data.messages - var nextOffset = nextOffsets(tp) - - messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset) - require(messageSet != null) - for (message <- messageSet.iterator) { - messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct - - nextOffset = message.nextOffset - - val bytesSize = message.message.payloadSize + message.message.keySize - metrics.reads.get(tp).inc - metrics.bytesRead.get(tp).inc(bytesSize) - metrics.brokerBytesRead.get((host, port)).inc(bytesSize) - metrics.offsets.get(tp).set(nextOffset) - } - - nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching. - - // Update high water mark - val hw = data.hw - if (hw >= 0) { - metrics.highWatermark.get(tp).set(hw) - metrics.lag.get(tp).set(hw - nextOffset) - } else { - debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp)) - } - } - override def toString() = "BrokerProxy for %s:%d" format (host, port) - - def start { - if (!thread.isAlive) { - info("Starting " + toString) - thread.setDaemon(true) - thread.setName("Samza BrokerProxy " + thread.getName) - thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler { - override def uncaughtException(t: Thread, e: Throwable) = error("Uncaught exception in broker proxy:", e) - }) - thread.start - } else { - debug("Tried to start an already started broker proxy (%s). Ignoring." format toString) - } - } - - def stop { - info("Shutting down " + toString) - - if (simpleConsumer != null) { - info("closing simple consumer...") - simpleConsumer.close - } - - thread.interrupt - thread.join - } - - private def refreshLatencyMetrics { - nextOffsets.foreach{ - case (topicAndPartition, offset) => { - val latestOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, -1, Request.OrdinaryConsumerId) - trace("latest offset of %s is %s" format (topicAndPartition, latestOffset)) - if (latestOffset >= 0) { - // only update the registered topicAndpartitions - if(metrics.highWatermark.containsKey(topicAndPartition)) { - metrics.highWatermark.get(topicAndPartition).set(latestOffset) - } - if(metrics.lag.containsKey(topicAndPartition)) { - metrics.lag.get(topicAndPartition).set(latestOffset - offset) - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java index 01b345a..e61e0ff 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaConsumerProxy.java @@ -47,8 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Separate thread that reads messages from kafka and puts them int the BlockingEnvelopeMap - * This class is not thread safe. There will be only one instance of this class per LiKafkaSystemConsumer object + * Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap. + * This class is not thread safe. There will be only one instance of this class per LiKafkaSystemConsumer object. * We still need some synchronization around kafkaConsumer. See pollConsumer() method for details. */ public class KafkaConsumerProxy<K, V> { @@ -65,7 +65,7 @@ public class KafkaConsumerProxy<K, V> { private final String clientId; private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); private final Map<SystemStreamPartition, MetricName> ssp2MetricName = new HashMap<>(); - // list of all the SSPs we poll from with their next offsets correspondingly. + // list of all the SSPs we poll from, with their next offsets correspondingly. private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>(); // lags behind the high water mark, as reported by the Kafka consumer. private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala deleted file mode 100644 index fd84c4a..0000000 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ /dev/null @@ -1,309 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka - -import kafka.common.TopicAndPartition -import org.apache.samza.util.Logging -import kafka.message.Message -import kafka.message.MessageAndOffset -import org.apache.samza.Partition -import org.apache.kafka.common.utils.Utils -import org.apache.samza.util.Clock -import kafka.serializer.DefaultDecoder -import kafka.serializer.Decoder -import org.apache.samza.util.BlockingEnvelopeMap -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.system.IncomingMessageEnvelope -import kafka.consumer.ConsumerConfig -import org.apache.samza.util.TopicMetadataStore -import kafka.api.PartitionMetadata -import kafka.api.TopicMetadata -import org.apache.samza.util.ExponentialSleepStrategy -import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConverters._ -import org.apache.samza.system.SystemAdmin - -object KafkaSystemConsumer { - - // Approximate additional shallow heap overhead per message in addition to the raw bytes - // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead. - // As this overhead is a moving target, and not very large - // compared to the message size its being ignore in the computation for now. - val MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4; - - def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = { - val topic = systemStreamPartition.getStream - val partitionId = systemStreamPartition.getPartition.getPartitionId - TopicAndPartition(topic, partitionId) - } -} - -/** - * Maintain a cache of BrokerProxies, returning the appropriate one for the - * requested topic and partition. - */ -private[kafka] class KafkaSystemConsumer( - systemName: String, - systemAdmin: SystemAdmin, - metrics: KafkaSystemConsumerMetrics, - metadataStore: TopicMetadataStore, - clientId: String, - timeout: Int = ConsumerConfig.ConsumerTimeoutMs, - bufferSize: Int = ConsumerConfig.SocketBufferSize, - fetchSize: StreamFetchSizes = new StreamFetchSizes, - consumerMinSize: Int = ConsumerConfig.MinFetchBytes, - consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs, - - /** - * Defines a low water mark for how many messages we buffer before we start - * executing fetch requests against brokers to get more messages. This value - * is divided equally among all registered SystemStreamPartitions. For - * example, if fetchThreshold is set to 50000, and there are 50 - * SystemStreamPartitions registered, then the per-partition threshold is - * 1000. As soon as a SystemStreamPartition's buffered message count drops - * below 1000, a fetch request will be executed to get more data for it. - * - * Increasing this parameter will decrease the latency between when a queue - * is drained of messages and when new messages are enqueued, but also leads - * to an increase in memory usage since more messages will be held in memory. - */ - fetchThreshold: Int = 50000, - /** - * Defines a low water mark for how many bytes we buffer before we start - * executing fetch requests against brokers to get more messages. This - * value is divided by 2 because the messages are buffered twice, once in - * KafkaConsumer and then in SystemConsumers. This value - * is divided equally among all registered SystemStreamPartitions. - * However this is a soft limit per partition, as the - * bytes are cached at the message boundaries, and the actual usage can be - * 1000 bytes + size of max message in the partition for a given stream. - * The bytes if the size of the bytebuffer in Message. Hence, the - * Object overhead is not taken into consideration. In this codebase - * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB, - * which is not considerable. - * - * For example, - * if fetchThresholdBytes is set to 100000 bytes, and there are 50 - * SystemStreamPartitions registered, then the per-partition threshold is - * (100000 / 2) / 50 = 1000 bytes. - * As this is a soft limit, the actual usage can be 1000 bytes + size of max message. - * As soon as a SystemStreamPartition's buffered messages bytes drops - * below 1000, a fetch request will be executed to get more data for it. - * - * Increasing this parameter will decrease the latency between when a queue - * is drained of messages and when new messages are enqueued, but also leads - * to an increase in memory usage since more messages will be held in memory. - * - * The default value is -1, which means this is not used. When the value - * is > 0, then the fetchThreshold which is count based is ignored. - */ - fetchThresholdBytes: Long = -1, - /** - * if(fetchThresholdBytes > 0) true else false - */ - fetchLimitByBytesEnabled: Boolean = false, - offsetGetter: GetOffset = new GetOffset("fail"), - deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], - keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], - retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy, - clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap( - metrics.registry, - new Clock { - def currentTimeMillis = clock() - }, - classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging { - - type HostPort = (String, Int) - val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]() - val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]().asScala - var perPartitionFetchThreshold = fetchThreshold - var perPartitionFetchThresholdBytes = 0L - - def start() { - if (topicPartitionsAndOffsets.size > 0) { - perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size - // messages get double buffered, hence divide by 2 - if(fetchLimitByBytesEnabled) { - perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size - } - } - - systemAdmin.start() - refreshBrokers - } - - override def register(systemStreamPartition: SystemStreamPartition, offset: String) { - super.register(systemStreamPartition, offset) - - val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition) - val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset) - // register the older offset in the consumer - if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) { - topicPartitionsAndOffsets.replace(topicAndPartition, offset) - } - - metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)) - } - - def stop() { - systemAdmin.stop() - brokerProxies.values.foreach(_.stop) - } - - protected def createBrokerProxy(host: String, port: Int): BrokerProxy = { - info("Creating new broker proxy for host: %s and port: %s" format(host, port)) - new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter) - } - - protected def getPartitionMetadata(topicMetadata: TopicMetadata, partition: Int): Option[PartitionMetadata] = { - topicMetadata.partitionsMetadata.find(_.partitionId == partition) - } - - protected def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { - // Whatever we do, we can't say Broker, even though we're - // manipulating it here. Broker is a private type and Scala doesn't seem - // to care about that as long as you don't explicitly declare its type. - val brokerOption = partitionMetadata.flatMap(_.leader) - - brokerOption match { - case Some(broker) => Some(broker.host, broker.port) - case _ => None - } - } - - def refreshBrokers { - var tpToRefresh = topicPartitionsAndOffsets.keySet.toList - info("Refreshing brokers for: %s" format topicPartitionsAndOffsets) - retryBackoff.run( - loop => { - val topics = tpToRefresh.map(_.topic).toSet - val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics)) - - // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions. - // This avoids trying to re-add the same topic partition repeatedly - def refresh() = { - val head = tpToRefresh.head - // refreshBrokers can be called from abdicate and refreshDropped, - // both of which are triggered from BrokerProxy threads. To prevent - // accidentally creating multiple objects for the same broker, or - // accidentally not updating the topicPartitionsAndOffsets variable, - // we need to lock. - this.synchronized { - // Check if we still need this TopicAndPartition inside the - // critical section. If we don't, then notAValidEvent it. - topicPartitionsAndOffsets.get(head) match { - case Some(nextOffset) => - val partitionMetadata = getPartitionMetadata(topicMetadata(head.topic), head.partition) - getLeaderHostPort(partitionMetadata) match { - case Some((host, port)) => - debug("Got partition metadata for %s: %s" format(head, partitionMetadata.get)) - val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port)) - brokerProxy.addTopicPartition(head, Option(nextOffset)) - brokerProxy.start - debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy)) - topicPartitionsAndOffsets -= head - case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head) - } - case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head) - } - } - tpToRefresh.tail - } - - while (!tpToRefresh.isEmpty) { - tpToRefresh = refresh() - } - - loop.done - }, - - (exception, loop) => { - warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception)) - debug("Exception detail:", exception) - }) - } - - val sink = new MessageSink { - var lastDroppedRefresh = clock() - - def refreshDropped() { - if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) { - refreshBrokers - lastDroppedRefresh = clock() - } - } - - def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) { - setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark) - } - - def needsMoreMessages(tp: TopicAndPartition) = { - if(fetchLimitByBytesEnabled) { - getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes - } else { - getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold - } - } - - def getMessageSize(message: Message): Integer = { - message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD - } - - def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = { - trace("Incoming message %s: %s." format (tp, msg)) - - val systemStreamPartition = toSystemStreamPartition(tp) - val isAtHead = highWatermark == msg.offset - val offset = msg.offset.toString - val key = if (msg.message.key != null) { - keyDeserializer.fromBytes(Utils.readBytes(msg.message.key)) - } else { - null - } - val message = if (!msg.message.isNull) { - deserializer.fromBytes(Utils.readBytes(msg.message.payload)) - } else { - null - } - - if(fetchLimitByBytesEnabled ) { - val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)) - ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L) - put(systemStreamPartition, ime) - } else { - val ime = new IncomingMessageEnvelope(systemStreamPartition, offset, key, message) - ime.setTimestamp(if (!msg.message.isNull) msg.message.timestamp else 0L) - put(systemStreamPartition, ime) - } - - setIsAtHead(systemStreamPartition, isAtHead) - } - - def abdicate(tp: TopicAndPartition, nextOffset: Long) { - info("Abdicating for %s" format (tp)) - topicPartitionsAndOffsets += tp -> nextOffset.toString - refreshBrokers - } - - private def toSystemStreamPartition(tp: TopicAndPartition) = { - new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition)) - } - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 51545a0..1aa66dc 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -35,6 +35,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr val highWatermark = new ConcurrentHashMap[TopicAndPartition, Gauge[Long]] /* + TODO Fix * (String, Int) = (host, port) of BrokerProxy. */ http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java index dd7e584..b745628 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/NewKafkaSystemConsumer.java @@ -66,14 +66,14 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements private final KafkaSystemConsumerMetrics samzaConsumerMetrics; private final String clientId; private final String metricName; - private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); + /* package private */final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>(); private final AtomicBoolean stopped = new AtomicBoolean(false); private final AtomicBoolean started = new AtomicBoolean(false); private final Config config; private final boolean fetchThresholdBytesEnabled; // This sink is used to transfer the messages from the proxy/consumer to the BlockingEnvelopeMap. - private KafkaConsumerMessageSink messageSink; + /* package private */ KafkaConsumerMessageSink messageSink; // proxy is doing the actual reading private KafkaConsumerProxy proxy; @@ -142,17 +142,6 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements Map<String, String> injectProps = new HashMap<>(); - // the consumer is fully typed, and deserialization can be too. But in case it is not provided we should - // default to byte[] - if ( !config.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("default key serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); - injectProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - if ( !config.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)) { - LOG.info("default value serialization for the consumer(for {}) to ByteArrayDeserializer", systemName); - injectProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); - } - // extract kafka consumer configs KafkaConsumerConfig consumerConfig = KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, systemName, clientId, injectProps); @@ -203,7 +192,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements } } - private void createConsumerProxy() { + void createConsumerProxy() { // create a sink for passing the messages between the proxy and the consumer messageSink = new KafkaConsumerMessageSink(); @@ -219,7 +208,7 @@ public class NewKafkaSystemConsumer<K,V> extends BlockingEnvelopeMap implements Add the TopicPartitions to the proxy. Start the proxy thread. */ - private void startConsumer() { + void startConsumer() { //set the offset for each TopicPartition topicPartitions2Offset.forEach((tp, startingOffsetString) -> { long startingOffset = Long.valueOf(startingOffsetString); http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala index 8544dbf..8d92f4d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala @@ -92,8 +92,8 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { assertNull(readCp) writeCheckpoint(checkpointTopic, taskName, checkpoint1) - assertEquals(checkpoint1, readCheckpoint(checkpointTopic, taskName)) + // writing a second message and reading it returns a more recent checkpoint writeCheckpoint(checkpointTopic, taskName, checkpoint2) assertEquals(checkpoint2, readCheckpoint(checkpointTopic, taskName)) @@ -194,7 +194,6 @@ class TestKafkaCheckpointManager extends KafkaServerTestHarness { val systemFactory = Util.getObj(systemFactoryClassName, classOf[SystemFactory]) val spec = new KafkaStreamSpec("id", cpTopic, checkpointSystemName, 1, 1, props) - System.out.println("CONFIG = " + config) new KafkaCheckpointManager(spec, systemFactory, failOnTopicValidation, config, new NoOpMetricsRegistry, serde) } http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index d510076..a3f76e7 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -41,6 +41,7 @@ import org.mockito.{Matchers, Mockito} import scala.collection.JavaConverters._ class TestBrokerProxy extends Logging { + /* val tp2 = new TopicAndPartition("Redbird", 2013) var fetchTp1 = true // control whether fetching tp1 messages or not @@ -305,6 +306,7 @@ class TestBrokerProxy extends Logging { } /** + * TODO fix * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions * that it owns when a consumer failure occurs. */ @@ -431,4 +433,5 @@ class TestBrokerProxy extends Logging { bp.stop verify(mockSimpleConsumer).close } + */ } http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala deleted file mode 100644 index 8656d10..0000000 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.system.kafka - -import kafka.api.TopicMetadata -import kafka.api.PartitionMetadata -import kafka.cluster.Broker -import kafka.common.TopicAndPartition -import kafka.message.Message -import kafka.message.MessageAndOffset -import org.apache.kafka.common.protocol.Errors -import org.apache.samza.system.IncomingMessageEnvelope -import org.apache.samza.system.SystemStreamPartition -import org.apache.samza.Partition -import org.apache.samza.util.TopicMetadataStore -import org.junit.Test -import org.junit.Assert._ -import org.apache.samza.system.SystemAdmin -import org.mockito.Mockito._ -import org.mockito.Matchers._ - -class TestKafkaSystemConsumer { - val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin]) - private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0)) - private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null) - private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100) - private val clientId = "TestClientId" - - @Test - def testFetchThresholdShouldDivideEvenlyAmongPartitions { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) { - override def refreshBrokers { - } - } - - for (i <- 0 until 50) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - assertEquals(1000, consumer.perPartitionFetchThreshold) - } - - @Test - def testBrokerCreationShouldTriggerStart { - val systemName = "test-system" - val streamName = "test-stream" - val metrics = new KafkaSystemConsumerMetrics - // Lie and tell the store that the partition metadata is empty. We can't - // use partition metadata because it has Broker in its constructor, which - // is package private to Kafka. - val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, Errors.NONE))) - var hosts = List[String]() - var getHostPortCount = 0 - val consumer = new KafkaSystemConsumer(systemName, systemAdmin, metrics, metadataStore, clientId) { - override def getLeaderHostPort(partitionMetadata: Option[PartitionMetadata]): Option[(String, Int)] = { - // Generate a unique host every time getHostPort is called. - getHostPortCount += 1 - Some("localhost-%s" format getHostPortCount, 0) - } - - override def createBrokerProxy(host: String, port: Int): BrokerProxy = { - new BrokerProxy(host, port, systemName, "", metrics, sink) { - override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = { - // Skip this since we normally do verification of offsets, which - // tries to connect to Kafka. Rather than mock that, just forget it. - nextOffsets.size - } - - override def start { - hosts :+= host - } - } - } - } - - consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1") - assertEquals(0, hosts.size) - consumer.start - assertEquals(List("localhost-1"), hosts) - // Should trigger a refresh with a new host. - consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2) - assertEquals(List("localhost-1", "localhost-2"), hosts) - } - - @Test - def testConsumerRegisterOlderOffsetOfTheSamzaSSP { - when(systemAdmin.offsetComparator(anyString, anyString)).thenCallRealMethod() - - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, fetchThreshold = 50000) - val ssp0 = new SystemStreamPartition("test-system", "test-stream", new Partition(0)) - val ssp1 = new SystemStreamPartition("test-system", "test-stream", new Partition(1)) - val ssp2 = new SystemStreamPartition("test-system", "test-stream", new Partition(2)) - - consumer.register(ssp0, "0") - consumer.register(ssp0, "5") - consumer.register(ssp1, "2") - consumer.register(ssp1, "3") - consumer.register(ssp2, "0") - - assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp0))) - assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1))) - assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2))) - } - - @Test - def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, - fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { - override def refreshBrokers { - } - } - - for (i <- 0 until 10) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - assertEquals(5000, consumer.perPartitionFetchThreshold) - assertEquals(3000, consumer.perPartitionFetchThresholdBytes) - } - - @Test - def testFetchThresholdBytes { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, - fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) { - override def refreshBrokers { - } - } - - for (i <- 0 until 10) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - val msg = Array[Byte](5, 112, 9, 126) - val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654) - // 4 data + 18 Message overhead + 80 IncomingMessageEnvelope overhead - consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354) - - assertEquals(106, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) - } - - @Test - def testFetchThresholdBytesDisabled { - val metadataStore = new MockMetadataStore - val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore, clientId, - fetchThreshold = 50000, fetchThresholdBytes = 60000L) { - override def refreshBrokers { - } - } - - for (i <- 0 until 10) { - consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0") - } - - consumer.start - - assertEquals(5000, consumer.perPartitionFetchThreshold) - assertEquals(0, consumer.perPartitionFetchThresholdBytes) - assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0)))) - } -} - -class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore { - def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata -} http://git-wip-us.apache.org/repos/asf/samza/blob/89f79829/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java new file mode 100644 index 0000000..f7f63f3 --- /dev/null +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestNewKafkaSystemConsumer.java @@ -0,0 +1,203 @@ +package org.apache.samza.system.kafka; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.KafkaConsumerConfig; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.samza.Partition; +import org.apache.samza.config.Config; +import org.apache.samza.config.KafkaConfig; +import org.apache.samza.config.MapConfig; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.NoOpMetricsRegistry; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestNewKafkaSystemConsumer { + public final String TEST_SYSTEM = "test-system"; + public final String TEST_STREAM = "test-stream"; + public final String TEST_CLIENT_ID = "testClientId"; + public final String BOOTSTRAP_SERVER = "127.0.0.1:8888"; + public final String FETCH_THRESHOLD_MSGS = "50000"; + public final String FETCH_THRESHOLD_BYTES = "100000"; + + @Before + public void setUp() { + + } + + private NewKafkaSystemConsumer setupConsumer(String fetchMsg, String fetchBytes) { + final Map<String, String> map = new HashMap<>(); + + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD(), TEST_SYSTEM), fetchMsg); + map.put(String.format(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES(), TEST_SYSTEM), fetchBytes); + map.put(String.format("systems.%s.consumer.%s", TEST_SYSTEM, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + BOOTSTRAP_SERVER); + + Config config = new MapConfig(map); + KafkaConsumerConfig consumerConfig = + KafkaConsumerConfig.getKafkaSystemConsumerConfig(config, TEST_SYSTEM, TEST_CLIENT_ID, Collections.emptyMap()); + final KafkaConsumer<byte[], byte[]> kafkaConsumer = new MockKafkaConsumer(consumerConfig.originals()); + + MockNewKafkaSystmeCosumer newKafkaSystemConsumer = + new MockNewKafkaSystmeCosumer(kafkaConsumer, TEST_SYSTEM, config, TEST_CLIENT_ID, + new KafkaSystemConsumerMetrics(TEST_SYSTEM, new NoOpMetricsRegistry()), System::currentTimeMillis); + + return newKafkaSystemConsumer; + } + + @Test + public void testConfigValidations() { + + final NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.start(); + // should be no failures + } + + @Test + public void testFetchThresholdShouldDivideEvenlyAmongPartitions() { + final NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + final int partitionsNum = 50; + for (int i = 0; i < partitionsNum; i++) { + consumer.register(new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(i)), "0"); + } + + consumer.start(); + + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum, consumer.perPartitionFetchThreshold); + Assert.assertEquals(Long.valueOf(FETCH_THRESHOLD_BYTES) / 2 / partitionsNum, + consumer.perPartitionFetchThresholdBytes); + } + + @Test + public void testConsumerRegisterOlderOffsetOfTheSamzaSSP() { + + NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + SystemStreamPartition ssp2 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(2)); + + consumer.register(ssp0, "0"); + consumer.register(ssp0, "5"); + consumer.register(ssp1, "2"); + consumer.register(ssp1, "3"); + consumer.register(ssp2, "0"); + + assertEquals("0", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp0))); + assertEquals("2", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp1))); + assertEquals("0", consumer.topicPartitions2Offset.get(NewKafkaSystemConsumer.toTopicPartition(ssp2))); + } + + @Test + public void testFetchThresholdBytes() { + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 1; // fake size + int ime11Size = 20; + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + NewKafkaSystemConsumer consumer = setupConsumer(FETCH_THRESHOLD_MSGS, FETCH_THRESHOLD_BYTES); + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // queue for ssp0 should be full now, because we added message of size FETCH_THRESHOLD_MSGS/partitionsNum + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // queue for ssp1 should be less then full now, because we added message of size (FETCH_THRESHOLD_MSGS/partitionsNum - 1) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // queue for ssp1 should full now, because we added message of size 20 on top + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + } + + @Test + public void testFetchThresholdBytesDiabled() { + // Pass 0 as fetchThresholdByBytes, which disables checking for limit by size + + SystemStreamPartition ssp0 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(0)); + SystemStreamPartition ssp1 = new SystemStreamPartition(TEST_SYSTEM, TEST_STREAM, new Partition(1)); + int partitionsNum = 2; + int ime0Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum; // fake size, upto the limit + int ime1Size = Integer.valueOf(FETCH_THRESHOLD_MSGS) / partitionsNum - 100; // fake size, below the limit + int ime11Size = 20;// event with the second message still below the size limit + ByteArraySerializer bytesSerde = new ByteArraySerializer(); + IncomingMessageEnvelope ime0 = new IncomingMessageEnvelope(ssp0, "0", bytesSerde.serialize("", "key0".getBytes()), + bytesSerde.serialize("", "value0".getBytes()), ime0Size); + IncomingMessageEnvelope ime1 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key1".getBytes()), + bytesSerde.serialize("", "value1".getBytes()), ime1Size); + IncomingMessageEnvelope ime11 = new IncomingMessageEnvelope(ssp1, "0", bytesSerde.serialize("", "key11".getBytes()), + bytesSerde.serialize("", "value11".getBytes()), ime11Size); + + // limit by number of messages 4/2 = 2 per partition + // limit by number of bytes - disabled + NewKafkaSystemConsumer consumer = setupConsumer("4", "0"); // should disable + + consumer.register(ssp0, "0"); + consumer.register(ssp1, "0"); + consumer.start(); + consumer.messageSink.addMessage(ssp0, ime0); + // should be full by size, but not full by number of messages (1 of 2) + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp0)); + consumer.messageSink.addMessage(ssp1, ime1); + // not full neither by size nor by messages + Assert.assertEquals(true, consumer.messageSink.needsMoreMessages(ssp1)); + consumer.messageSink.addMessage(ssp1, ime11); + // not full by size, but should be full by messages + Assert.assertEquals(false, consumer.messageSink.needsMoreMessages(ssp1)); + + Assert.assertEquals(1, consumer.getNumMessagesInQueue(ssp0)); + Assert.assertEquals(2, consumer.getNumMessagesInQueue(ssp1)); + Assert.assertEquals(ime0Size, consumer.getMessagesSizeInQueue(ssp0)); + Assert.assertEquals(ime1Size + ime11Size, consumer.getMessagesSizeInQueue(ssp1)); + } + + // mock kafkaConsumer and SystemConsumer + static class MockKafkaConsumer extends KafkaConsumer { + public MockKafkaConsumer(Map<String, Object> configs) { + super(configs); + } + } + + static class MockNewKafkaSystmeCosumer extends NewKafkaSystemConsumer { + public MockNewKafkaSystmeCosumer(Consumer kafkaConsumer, String systemName, Config config, String clientId, + KafkaSystemConsumerMetrics metrics, Clock clock) { + super(kafkaConsumer, systemName, config, clientId, metrics, clock); + } + + @Override + void createConsumerProxy() { + this.messageSink = new KafkaConsumerMessageSink(); + } + + @Override + void startConsumer() { + } + } +}
