Repository: samza Updated Branches: refs/heads/master 34178a63f -> 925866d9b
SAMZA-1145: Provide Ability To Confgure The Default Number Of Changel⦠â¦og Replicas Author: James Lent <jl...@nc.rr.com> Reviewers: Yi Pan <nickpa...@apache.org>, Jagadish <vjagadish1...@gmail.com> Closes #86 from jwlent55/SAMZA-1145 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/925866d9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/925866d9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/925866d9 Branch: refs/heads/master Commit: 925866d9b0a21fe49cd643544d8cad83f1c124bf Parents: 34178a6 Author: James Lent <jl...@nc.rr.com> Authored: Fri Apr 14 12:09:07 2017 -0700 Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com> Committed: Fri Apr 14 12:09:07 2017 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 15 +++++++++--- .../org/apache/samza/config/KafkaConfig.scala | 4 +++- .../samza/system/kafka/KafkaSystemFactory.scala | 2 +- .../apache/samza/config/TestKafkaConfig.scala | 24 ++++++++++++++++++++ 4 files changed, 40 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index ba04139..df59547 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -1272,13 +1272,21 @@ <tr> <td class="property" id="store-changelog-replication-factor">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td> - <td class="default">2</td> + <td class="default">stores.default.changelog.replication.factor</td> <td class="description"> The property defines the number of replicas to use for the change log stream. </td> </tr> <tr> + <td class="property" id="store-default-changelog-replication-factor">stores.default.changelog.replication.factor</td> + <td class="default">2</td> + <td class="description"> + This property defines the default number of replicas to use for the change log stream. + </td> + </tr> + + <tr> <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>kafka.topic-level-property</td> <td class="default"></td> <td class="description"> @@ -1343,8 +1351,9 @@ <td class="description"> This property defines a store, Samza's mechanism for efficient <a href="../container/state-management.html">stateful stream processing</a>. You can give a - store any <span class="store">store-name</span>, and use that name to get a reference to the - store in your stream task (call + store any <span class="store">store-name</span> except <em>default</em> (the <span class="store">store-name</span> + <em>default</em> is reserved for defining default store parameters), and use that name to get a + reference to the store in your stream task (call <a href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a> in your task's <a href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config, org.apache.samza.task.TaskContext)">init()</a> http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala index ae6330f..a8c1f3a 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala @@ -52,6 +52,7 @@ object KafkaConfig { val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes" val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + TOPIC_REPLICATION_FACTOR + val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = CHANGELOG_STREAM_REPLICATION_FACTOR format "default" val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka." // The default segment size to use for changelog topics val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912" @@ -132,7 +133,8 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) { def getRegexResolvedInheritedConfig(rewriterName: String) = config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", true) - def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name) + def getChangelogStreamReplicationFactor(name: String) = getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name).getOrElse(getDefaultChangelogStreamReplicationFactor) + def getDefaultChangelogStreamReplicationFactor = getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse("2") // The method returns a map of storenames to changelog topic names, which are configured to use kafka as the changelog stream def getKafkaChangelogEnabledStores() = { http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index d0e3089..638806b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -118,7 +118,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { // Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream. val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) => { - val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt + val replicationFactor = config.getChangelogStreamReplicationFactor(storeName).toInt val changelogInfo = ChangelogInfo(replicationFactor, config.getChangelogKafkaProperties(storeName)) info("Creating topic meta information for topic: %s with replication factor: %s" format (topicName, replicationFactor)) (topicName, changelogInfo) http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala index 555ab9f..106a0d5 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala @@ -192,4 +192,28 @@ class TestKafkaConfig { val kafkaProducerConfig = kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID) kafkaProducerConfig.getProducerProperties } + + @Test + def testChangeLogReplicationFactor() { + props.setProperty("stores.store-with-override.changelog.replication.factor", "3") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "3") + assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "2") + assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "2") + } + + @Test + def testChangeLogReplicationFactorWithOverriddenDefault() { + props.setProperty("stores.store-with-override.changelog.replication.factor", "4") + // Override the "default" default value + props.setProperty("stores.default.changelog.replication.factor", "5") + + val mapConfig = new MapConfig(props.asScala.asJava) + val kafkaConfig = new KafkaConfig(mapConfig) + assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"), "4") + assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"), "5") + assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "5") + } }