Repository: incubator-samza Updated Branches: refs/heads/master c72223f99 -> 868fc7ade
SAMZA-144; add stream level overrides for fetch size. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/868fc7ad Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/868fc7ad Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/868fc7ad Branch: refs/heads/master Commit: 868fc7ade3a84a2bbef9970f9cebb9dd956cadd3 Parents: c72223f Author: Yan Fang <[email protected]> Authored: Mon Jun 9 15:59:10 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Jun 9 15:59:10 2014 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/KafkaConfig.scala | 14 ++++++++ .../apache/samza/system/kafka/BrokerProxy.scala | 2 +- .../kafka/DefaultFetchSimpleConsumer.scala | 16 ++++++--- .../system/kafka/KafkaSystemConsumer.scala | 2 +- .../samza/system/kafka/KafkaSystemFactory.scala | 3 +- .../apache/samza/config/TestKafkaConfig.scala | 38 +++++++++++++++++--- .../samza/system/kafka/TestBrokerProxy.scala | 4 +-- 7 files changed, 64 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index 4deabd3..b95e493 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -56,6 +56,20 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name) /** + * Returns a map of topic -> fetch.message.max.bytes value for all streams that + * are defined with this proeprty in thec onfig. + */ + def getFetchMessageMaxBytesTopics(systemName: String) = { + val subConf = config.subset("systems.%s.streams." format systemName, true) + subConf + .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes")) + .map { + case (fetchMessageMaxBytes, fetchSizeValue) => + (fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt) + }.toMap + } + + /** * Returns a map of topic -> auto.offset.reset value for all streams that * are defined with this property in the config. */ http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 561e990..f094fa0 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -54,7 +54,7 @@ class BrokerProxy( val messageSink: MessageSink, val timeout: Int = ConsumerConfig.SocketTimeout, val bufferSize: Int = ConsumerConfig.SocketBufferSize, - val fetchSize:Int = ConsumerConfig.FetchSize, + val fetchSize: StreamFetchSizes = new StreamFetchSizes, val consumerMinSize:Int = ConsumerConfig.MinFetchBytes, val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs, offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging { http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala index d90ca78..5b4886a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala @@ -27,16 +27,16 @@ import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int, - clientId: scala.Predef.String, fetchSize: Int = ConsumerConfig.FetchSize, - minBytes:Int = ConsumerConfig.MinFetchBytes, maxWait:Int = ConsumerConfig.MaxFetchWaitMs) + clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes, + minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs) extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) { - def defaultFetch(fetches:(TopicAndPartition, Long)*) = { + def defaultFetch(fetches: (TopicAndPartition, Long)*) = { val fbr = new FetchRequestBuilder().maxWait(maxWait) .minBytes(minBytes) .clientId(clientId) - fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize)) + fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue))) this.fetch(fbr.build()) } @@ -56,3 +56,11 @@ class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soT override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId) } +/** + * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes). + * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize. + * If stream-level fetch size is not defined, use the default value. The default value is the + * Kafka's default fetch size value or the system-level fetch size value (if defined). + */ +case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]()) + http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 105f6c6..2163d57 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -58,7 +58,7 @@ private[kafka] class KafkaSystemConsumer( clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString, timeout: Int = ConsumerConfig.ConsumerTimeoutMs, bufferSize: Int = ConsumerConfig.SocketBufferSize, - fetchSize: Int = ConsumerConfig.MaxFetchSize, + fetchSize: StreamFetchSizes = new StreamFetchSizes, consumerMinSize: Int = ConsumerConfig.MinFetchBytes, consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs, http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index f4dc1c1..4ed5e88 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -40,10 +40,9 @@ class KafkaSystemFactory extends SystemFactory { .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName)) val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId) - // TODO could add stream-level overrides for timeout and buffer size val timeout = consumerConfig.socketTimeoutMs val bufferSize = consumerConfig.socketReceiveBufferBytes - val fetchSize = consumerConfig.fetchMessageMaxBytes + val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName)) val consumerMinSize = consumerConfig.fetchMinBytes val consumerMaxWait = consumerConfig.fetchWaitMaxMs val autoOffsetResetDefault = consumerConfig.autoOffsetReset http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 93cf5a5..468aa3d 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -26,6 +26,7 @@ import java.io.File import java.util.Properties import scala.collection.JavaConversions._ import org.apache.samza.config.factories.PropertiesConfigFactory +import kafka.consumer.ConsumerConfig class TestKafkaConfig { @@ -34,19 +35,19 @@ class TestKafkaConfig { val factory = new PropertiesConfigFactory() val props = new Properties props.setProperty(" systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory") - props.setProperty( "systems.kafka.consumer.zookeeper.connect", "localhost:2181/") - props.setProperty( "systems.kafka.producer.metadata.broker.list", "localhost:9092") + props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/") + props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092") val mapConfig = new MapConfig(props.toMap[String, String]) val kafkaConfig = new KafkaConfig(mapConfig) - + val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig("kafka") val consumerClientId1 = consumerConfig1.clientId val groupId1 = consumerConfig1.groupId val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig("kafka") val consumerClientId2 = consumerConfig2.clientId val groupId2 = consumerConfig2.groupId - assert( consumerClientId1.startsWith("undefined-samza-consumer-")) + assert(consumerClientId1.startsWith("undefined-samza-consumer-")) assert(consumerClientId2.startsWith("undefined-samza-consumer-")) assert(groupId1.startsWith("undefined-samza-consumer-group-")) assert(groupId2.startsWith("undefined-samza-consumer-group-")) @@ -64,7 +65,7 @@ class TestKafkaConfig { val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig("kafka") val producerClientId2 = producerConfig2.clientId - assert( producerClientId1.startsWith("undefined-samza-producer-")) + assert(producerClientId1.startsWith("undefined-samza-producer-")) assert(producerClientId2.startsWith("undefined-samza-producer-")) assert(producerClientId1 != producerClientId2) @@ -73,4 +74,31 @@ class TestKafkaConfig { assert(producerClientId3 == "TestClientId") } + + @Test + def testStreamLevelFetchSizeOverride() { + val props = new Properties + props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/") + props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092") + + val mapConfig = new MapConfig(props.toMap[String, String]) + val kafkaConfig = new KafkaConfig(mapConfig) + val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig("kafka") + // default fetch size + assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes) + + props.setProperty("systems.kafka.consumer.fetch.message.max.bytes", "262144") + val mapConfig1 = new MapConfig(props.toMap[String, String]) + val kafkaConfig1 = new KafkaConfig(mapConfig1) + val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig("kafka") + // shared fetch size + assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes) + + props.setProperty("systems.kafka.streams.topic1.consumer.fetch.message.max.bytes", "65536") + val mapConfig2 = new MapConfig(props.toMap[String, String]) + val kafkaConfig2 = new KafkaConfig(mapConfig2) + val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics("kafka") + // topic fetch size + assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024)) + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 9c0ca60..b4e7178 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -86,7 +86,7 @@ class TestBrokerProxy extends Logging { } alreadyCreatedConsumer = true - new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", 42) { + new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) { val sc = Mockito.mock(classOf[SimpleConsumer]) val mockOffsetResponse = { val offsetResponse = Mockito.mock(classOf[OffsetResponse]) @@ -253,7 +253,7 @@ class TestBrokerProxy extends Logging { // So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset - val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) { + val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) { override def createSimpleConsumer() = { if(callsToCreateSimpleConsumer > 1) {
