Repository: incubator-samza
Updated Branches:
  refs/heads/0.7.0 6e981b43c -> a3a368893


SAMZA-296: Configuration cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a3a36889
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a3a36889
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a3a36889

Branch: refs/heads/0.7.0
Commit: a3a3688936fdc700a9a8b8728f913aa1c63ec803
Parents: 6e981b4
Author: Martin Kleppmann <[email protected]>
Authored: Wed Jun 18 10:07:57 2014 -0700
Committer: Martin Kleppmann <[email protected]>
Committed: Wed Jun 18 11:47:28 2014 -0700

----------------------------------------------------------------------
 .../0.7.0/yarn/application-master.md            |  2 +-
 .../org/apache/samza/config/SystemConfig.scala  |  2 +-
 .../org/apache/samza/config/TaskConfig.scala    |  3 --
 .../org/apache/samza/config/KafkaConfig.scala   |  6 ---
 samza-test/src/main/resources/common.properties | 42 --------------------
 .../org/apache/samza/config/YarnConfig.scala    | 34 ++++------------
 .../yarn/TestSamzaAppMasterTaskManager.scala    |  2 +-
 7 files changed, 11 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/docs/learn/documentation/0.7.0/yarn/application-master.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/yarn/application-master.md 
b/docs/learn/documentation/0.7.0/yarn/application-master.md
index 30b14cd..6b81805 100644
--- a/docs/learn/documentation/0.7.0/yarn/application-master.md
+++ b/docs/learn/documentation/0.7.0/yarn/application-master.md
@@ -41,7 +41,7 @@ From this point on, the ApplicationMaster just reacts to 
events from the RM.
 
 ### Fault Tolerance
 
-Whenever a container is allocated, the AM will work with the YARN NM to start 
a SamzaContainer (with appropriate partitions assigned to it) in the container. 
If a container fails with a non-zero return code, the AM will request a new 
container, and restart the SamzaContainer. If a SamzaContainer fails too many 
times, too quickly, the ApplicationMaster will fail the whole Samza job with a 
non-zero return code. See the yarn.countainer.retry.count and 
yarn.container.retry.window.ms [configuration](../jobs/configuration.html) 
parameters for details.
+Whenever a container is allocated, the AM will work with the YARN NM to start 
a SamzaContainer (with appropriate partitions assigned to it) in the container. 
If a container fails with a non-zero return code, the AM will request a new 
container, and restart the SamzaContainer. If a SamzaContainer fails too many 
times, too quickly, the ApplicationMaster will fail the whole Samza job with a 
non-zero return code. See the yarn.container.retry.count and 
yarn.container.retry.window.ms [configuration](../jobs/configuration.html) 
parameters for details.
 
 When the AM receives a reboot signal from YARN, it will throw a 
SamzaException. This will trigger a clean and successful shutdown of the AM 
(YARN won't think the AM failed).
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index 5bb17c7..4cfdcc2 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -49,7 +49,7 @@ class SystemConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
    */
   def getSystemNames() = {
     val subConf = config.subset("systems.", true)
-    // find all .samza.partition.manager keys, and strip the suffix
+    // find all .samza.factory keys, and strip the suffix
     subConf.keys.filter(k => 
k.endsWith(".samza.factory")).map(_.replace(".samza.factory", ""))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 3510f1f..18a9510 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -32,7 +32,6 @@ object TaskConfig {
   val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // 
task.lifecycle.listener.li-generator.class
   val CHECKPOINT_MANAGER_FACTORY = "task.checkpoint.factory" // class name to 
use when sending offset checkpoints
-  val TASK_JMX_ENABLED = "task.jmx.enabled" // Start up a JMX server for this 
task?
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
 
   implicit def Config2Task(config: Config) = new TaskConfig(config)
@@ -70,7 +69,5 @@ class TaskConfig(config: Config) extends 
ScalaMapConfig(config) {
 
   def getCheckpointManagerFactory() = 
getOption(TaskConfig.CHECKPOINT_MANAGER_FACTORY)
 
-  def getJmxServerEnabled = getBoolean(TaskConfig.TASK_JMX_ENABLED, true)
-  
   def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 4deabd3..91e8e49 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -33,9 +33,6 @@ object KafkaConfig {
   val CHECKPOINT_SYSTEM = "task.checkpoint.system"
   val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
 
-  val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + 
"consumer.key.deserializer.class"
-  val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + 
"consumer.deserializer.class"
-
   /**
    * Defines how low a queue can get for a single system/stream/partition
    * combination before trying to fetch more messages for it.
@@ -51,8 +48,6 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getCheckpointReplicationFactor() = 
getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
 
   // custom consumer config
-  def getConsumerKeyDeserializerClass(name: String) = 
getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
-  def getConsumerMsgDeserializerClass(name: String) = 
getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
   def getConsumerFetchThreshold(name: String) = 
getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 
   /**
@@ -61,7 +56,6 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
    */
   def getAutoOffsetResetTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
-    // find all .samza.partition.manager keys, and strip the suffix
     subConf
       .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
       .map {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-test/src/main/resources/common.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/common.properties 
b/samza-test/src/main/resources/common.properties
deleted file mode 100644
index 6e0c061..0000000
--- a/samza-test/src/main/resources/common.properties
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-job.factory.class=samza.job.local.LocalJobFactory
-
-task.checkpoint.factory=samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka-checkpoints
-task.checkpoint.replication.factor=1
-
-serializers.registry.string.class=samza.serializers.StringSerdeFactory
-
-# Kafka System
-systems.kafka.samza.factory=samza.system.kafka.KafkaSystemFactory
-systems.kafka.samza.partition.manager=samza.stream.kafka.KafkaPartitionManager
-systems.kafka.consumer.zookeeper.connect=localhost:2181
-systems.kafka.consumer.auto.offset.reset=smallest
-systems.kafka.producer.metadata.broker.list=localhost:9092
-systems.kafka.samza.msg.serde=string
-
-# Checkpoints System
-systems.kafka-checkpoints.samza.factory=samza.system.kafka.KafkaSystemFactory
-systems.kafka-checkpoints.serializer.class=samza.task.state.KafkaCheckpointEncoder
-systems.kafka-checkpoints.partitioner.class=samza.task.state.KafkaCheckpointPartitioner
-systems.kafka-checkpoints.key.serializer.class=kafka.serializer.NullEncoder
-systems.kafka-checkpoints.producer.metadata.broker.list=localhost:9092
-systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
-systems.kafka-checkpoints.producer.type=sync
-

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala 
b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 6c3aa92..e7868a6 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -24,7 +24,7 @@ object YarnConfig {
   val PACKAGE_PATH = "yarn.package.path"
   val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"
   val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
-  val CONTAINER_RETRY_COUNT = "yarn.countainer.retry.count"
+  val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
   val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
   val TASK_COUNT = "yarn.container.count"
   val AM_JVM_OPTIONS = "yarn.am.opts"
@@ -35,39 +35,21 @@ object YarnConfig {
 }
 
 class YarnConfig(config: Config) extends ScalaMapConfig(config) {
-  def getContainerMaxMemoryMb: Option[Int] = 
getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB) match {
-    case Some(mem) => Some(mem.toInt)
-    case _ => None
-  }
+  def getContainerMaxMemoryMb: Option[Int] = 
getOption(YarnConfig.CONTAINER_MAX_MEMORY_MB).map(_.toInt)
 
-  def getContainerMaxCpuCores: Option[Int] = 
getOption(YarnConfig.CONTAINER_MAX_CPU_CORES) match {
-    case Some(cpu) => Some(cpu.toInt)
-    case _ => None
-  }
+  def getContainerMaxCpuCores: Option[Int] = 
getOption(YarnConfig.CONTAINER_MAX_CPU_CORES).map(_.toInt)
 
-  def getContainerRetryCount: Option[Int] = 
getOption(YarnConfig.CONTAINER_RETRY_COUNT) match {
-    case Some(count) => Some(count.toInt)
-    case _ => None
-  }
+  def getContainerRetryCount: Option[Int] = 
getOption(YarnConfig.CONTAINER_RETRY_COUNT).map(_.toInt)
 
-  def getContainerRetryWindowMs: Option[Int] = 
getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS) match {
-    case Some(retryWindowMs) => Some(retryWindowMs.toInt)
-    case _ => None
-  }
+  def getContainerRetryWindowMs: Option[Int] = 
getOption(YarnConfig.CONTAINER_RETRY_WINDOW_MS).map(_.toInt)
 
   def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
 
-  def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT) match {
-    case Some(tc) => Some(tc.toInt)
-    case _ => None
-  }
+  def getTaskCount: Option[Int] = getOption(YarnConfig.TASK_COUNT).map(_.toInt)
 
   def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
-  
-  def getAMContainerMaxMemoryMb: Option[Int] = 
getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB) match {
-    case Some(mem) => Some(mem.toInt)
-    case _ => None
-  }
+
+  def getAMContainerMaxMemoryMb: Option[Int] = 
getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB).map(_.toInt)
 
   def getJmxServerEnabled = getBoolean(YarnConfig.AM_JMX_ENABLED, true)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a3a36889/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index 9d832ae..4cf2d49 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -139,7 +139,7 @@ class TestSamzaAppMasterTaskManager {
     "task.inputs" -> "test-system.test-stream",
     "systems.test-system.samza.key.serde" -> 
"org.apache.samza.serializers.JsonSerde",
     "systems.test-system.samza.msg.serde" -> 
"org.apache.samza.serializers.JsonSerde",
-    "yarn.countainer.retry.count" -> "1",
+    "yarn.container.retry.count" -> "1",
     "yarn.container.retry.window.ms" -> "1999999999"))
 
   @Test

Reply via email to