Repository: samza
Updated Branches:
  refs/heads/master 8a22e05c9 -> 0dc9dd24f


Minor fix to some config variable names and accessor methods.

Author: Prateek Maheshwari <pmaheshw...@apache.org>

Reviewers: Jagadish<jagad...@apache.org>

Closes #840 from prateekm/fix-config-names


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0dc9dd24
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0dc9dd24
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0dc9dd24

Branch: refs/heads/master
Commit: 0dc9dd24f8839cf5b261b2b42edb27d0b01b162c
Parents: 8a22e05
Author: Prateek Maheshwari <pmaheshw...@apache.org>
Authored: Tue Dec 4 14:08:57 2018 -0800
Committer: Jagadish <jvenkatra...@linkedin.com>
Committed: Tue Dec 4 14:08:57 2018 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/config/TaskConfig.scala | 12 ++++++------
 .../org/apache/samza/container/SamzaContainer.scala     | 11 ++---------
 .../apache/samza/system/kafka/KafkaSystemFactory.scala  |  2 +-
 .../system/kafka_deprecated/KafkaSystemFactory.scala    |  2 +-
 .../org/apache/samza/logging/log4j/StreamAppender.java  |  2 +-
 .../org/apache/samza/logging/log4j2/StreamAppender.java |  2 +-
 .../test/processor/TestZkLocalApplicationRunner.java    |  2 +-
 7 files changed, 13 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index a64589f..ba5d932 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -36,9 +36,9 @@ object TaskConfig {
   val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // 
task.lifecycle.listener.li-generator.class
   val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY 
// class name to use when sending offset checkpoints
   val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class"
-  val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // 
define whether drop the messages or not when deserialization fails
-  val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define 
whether drop the messages or not when serialization fails
-  val DROP_PRODUCER_ERROR = "task.drop.producer.errors" // whether to ignore 
producer errors and drop the messages that failed to send
+  val DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors" // 
define whether drop the messages or not when deserialization fails
+  val DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors" // define 
whether drop the messages or not when serialization fails
+  val DROP_PRODUCER_ERRORS = "task.drop.producer.errors" // whether to ignore 
producer errors and drop the messages that failed to send
   val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore 
in process and window
   val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task 
grouper
   val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent 
process for a AsyncStreamTask
@@ -115,11 +115,11 @@ class TaskConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
 
   def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME)
 
-  def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR)
+  def getDropDeserializationErrors = 
getBoolean(TaskConfig.DROP_DESERIALIZATION_ERRORS, false)
 
-  def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR)
+  def getDropSerializationErrors = 
getBoolean(TaskConfig.DROP_SERIALIZATION_ERRORS, false)
 
-  def getDropProducerError = getBoolean(TaskConfig.DROP_PRODUCER_ERROR, false)
+  def getDropProducerErrors = getBoolean(TaskConfig.DROP_PRODUCER_ERRORS, 
false)
 
   def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 94bc138..03effe6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -425,15 +425,8 @@ object SamzaContainer extends Logging {
     val offsetManager = OffsetManager(inputStreamMetadata, config, 
checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
     info("Got offset manager: %s" format offsetManager)
 
-    val dropDeserializationError = config.getDropDeserialization match {
-      case Some(dropError) => dropError.toBoolean
-      case _ => false
-    }
-
-    val dropSerializationError = config.getDropSerialization match {
-      case Some(dropError) => dropError.toBoolean
-      case _ => false
-    }
+    val dropDeserializationError = config.getDropDeserializationErrors
+    val dropSerializationError = config.getDropSerializationErrors
 
     val pollIntervalMs = config
       .getPollIntervalMs

http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index f314f92..cca7a6b 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -83,7 +83,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       new ExponentialSleepStrategy(initialDelayMs = 
producerConfig.reconnectIntervalMs),
       getProducer,
       metrics,
-      dropProducerExceptions = config.getDropProducerError)
+      dropProducerExceptions = config.getDropProducerErrors)
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {

http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
index d588831..cf7c5e8 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala
@@ -101,7 +101,7 @@ class KafkaSystemFactory extends SystemFactory with Logging 
{
       new ExponentialSleepStrategy(initialDelayMs = 
producerConfig.reconnectIntervalMs),
       getProducer,
       metrics,
-      dropProducerExceptions = config.getDropProducerError)
+      dropProducerExceptions = config.getDropProducerErrors)
   }
 
   def getAdmin(systemName: String, config: Config): SystemAdmin = {

http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index 5278284..b4a97f7 100644
--- 
a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ 
b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -279,7 +279,7 @@ public class StreamAppender extends AppenderSkeleton {
       throw new SamzaException("can not read the config", e);
     }
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
+    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
 
     return config;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
----------------------------------------------------------------------
diff --git 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
index e1d6dd3..28f759e 100644
--- 
a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
+++ 
b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java
@@ -300,7 +300,7 @@ public class StreamAppender extends AbstractAppender {
       throw new SamzaException("can not read the config", e);
     }
     // Make system producer drop producer errors for StreamAppender
-    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true"));
+    config = new MapConfig(config, 
ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true"));
 
     return config;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 7411318..78dad0d 100644
--- 
a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ 
b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -206,7 +206,7 @@ public class TestZkLocalApplicationRunner extends 
StandaloneIntegrationTestHarne
         .put(JobConfig.JOB_NAME(), appName)
         .put(JobConfig.JOB_ID(), appId)
         .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
-        .put(TaskConfig.DROP_PRODUCER_ERROR(), "true")
+        .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true")
         .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
         .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
         .build();

Reply via email to