Repository: samza
Updated Branches:
  refs/heads/master 34178a63f -> 925866d9b


SAMZA-1145: Provide Ability To Confgure The Default Number Of Changel…

…og Replicas

Author: James Lent <jl...@nc.rr.com>

Reviewers: Yi Pan <nickpa...@apache.org>, Jagadish <vjagadish1...@gmail.com>

Closes #86 from jwlent55/SAMZA-1145


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/925866d9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/925866d9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/925866d9

Branch: refs/heads/master
Commit: 925866d9b0a21fe49cd643544d8cad83f1c124bf
Parents: 34178a6
Author: James Lent <jl...@nc.rr.com>
Authored: Fri Apr 14 12:09:07 2017 -0700
Committer: Yi Pan (Data Infrastructure) <nickpa...@gmail.com>
Committed: Fri Apr 14 12:09:07 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 15 +++++++++---
 .../org/apache/samza/config/KafkaConfig.scala   |  4 +++-
 .../samza/system/kafka/KafkaSystemFactory.scala |  2 +-
 .../apache/samza/config/TestKafkaConfig.scala   | 24 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index ba04139..df59547 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1272,13 +1272,21 @@
 
                 <tr>
                     <td class="property" 
id="store-changelog-replication-factor">stores.<span 
class="store">store-name</span>.changelog.<br>replication.factor</td>
-                    <td class="default">2</td>
+                    <td 
class="default">stores.default.changelog.replication.factor</td>
                     <td class="description">
                         The property defines the number of replicas to use for 
the change log stream.
                     </td>
                 </tr>
 
                 <tr>
+                    <td class="property" 
id="store-default-changelog-replication-factor">stores.default.changelog.replication.factor</td>
+                    <td class="default">2</td>
+                    <td class="description">
+                        This property defines the default number of replicas 
to use for the change log stream.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" 
id="store-changelog-partitions">stores.<span 
class="store">store-name</span>.changelog.<br>kafka.topic-level-property</td>
                     <td class="default"></td>
                     <td class="description">
@@ -1343,8 +1351,9 @@
                     <td class="description">
                         This property defines a store, Samza's mechanism for 
efficient
                         <a href="../container/state-management.html">stateful 
stream processing</a>. You can give a
-                        store any <span class="store">store-name</span>, and 
use that name to get a reference to the
-                        store in your stream task (call
+                        store any <span class="store">store-name</span> except 
<em>default</em> (the <span class="store">store-name</span>
+                        <em>default</em> is reserved for defining default 
store parameters), and use that name to get a 
+                        reference to the store in your stream task (call
                         <a 
href="../api/javadocs/org/apache/samza/task/TaskContext.html#getStore(java.lang.String)">TaskContext.getStore()</a>
                         in your task's
                         <a 
href="../api/javadocs/org/apache/samza/task/InitableTask.html#init(org.apache.samza.config.Config,
 org.apache.samza.task.TaskContext)">init()</a>

http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index ae6330f..a8c1f3a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -52,6 +52,7 @@ object KafkaConfig {
   val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
 
   val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog." + 
TOPIC_REPLICATION_FACTOR
+  val DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR = 
CHANGELOG_STREAM_REPLICATION_FACTOR format "default"
   val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
   // The default segment size to use for changelog topics
   val CHANGELOG_DEFAULT_SEGMENT_SIZE = "536870912"
@@ -132,7 +133,8 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
 
   def getRegexResolvedInheritedConfig(rewriterName: String) = 
config.subset((KafkaConfig.REGEX_INHERITED_CONFIG format rewriterName) + ".", 
true)
 
-  def getChangelogStreamReplicationFactor(name: String) = 
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format name)
+  def getChangelogStreamReplicationFactor(name: String) = 
getOption(KafkaConfig.CHANGELOG_STREAM_REPLICATION_FACTOR format 
name).getOrElse(getDefaultChangelogStreamReplicationFactor)
+  def getDefaultChangelogStreamReplicationFactor = 
getOption(KafkaConfig.DEFAULT_CHANGELOG_STREAM_REPLICATION_FACTOR).getOrElse("2")
 
   // The method returns a map of storenames to changelog topic names, which 
are configured to use kafka as the changelog stream
   def getKafkaChangelogEnabledStores() = {

http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d0e3089..638806b 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -118,7 +118,7 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
     // Construct the meta information for each topic, if the replication 
factor is not defined, we use 2 as the number of replicas for the change log 
stream.
     val topicMetaInformation = storeToChangelog.map{case (storeName, 
topicName) =>
     {
-       val replicationFactor = 
config.getChangelogStreamReplicationFactor(storeName).getOrElse("2").toInt
+       val replicationFactor = 
config.getChangelogStreamReplicationFactor(storeName).toInt
        val changelogInfo = ChangelogInfo(replicationFactor, 
config.getChangelogKafkaProperties(storeName))
        info("Creating topic meta information for topic: %s with replication 
factor: %s" format (topicName, replicationFactor))
        (topicName, changelogInfo)

http://git-wip-us.apache.org/repos/asf/samza/blob/925866d9/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 555ab9f..106a0d5 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -192,4 +192,28 @@ class TestKafkaConfig {
     val kafkaProducerConfig = 
kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
   }
+
+  @Test
+  def testChangeLogReplicationFactor() {
+    
props.setProperty("stores.store-with-override.changelog.replication.factor", 
"3")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    
assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"),
 "3")
+    
assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"),
 "2")
+    assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "2")
+  }
+
+  @Test
+  def testChangeLogReplicationFactorWithOverriddenDefault() {
+    
props.setProperty("stores.store-with-override.changelog.replication.factor", 
"4")
+    // Override the "default" default value
+    props.setProperty("stores.default.changelog.replication.factor", "5")
+
+    val mapConfig = new MapConfig(props.asScala.asJava)
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    
assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-with-override"),
 "4")
+    
assertEquals(kafkaConfig.getChangelogStreamReplicationFactor("store-without-override"),
 "5")
+    assertEquals(kafkaConfig.getDefaultChangelogStreamReplicationFactor , "5")
+  }
 }

Reply via email to