Repository: samza Updated Branches: refs/heads/master 8a22e05c9 -> 0dc9dd24f
Minor fix to some config variable names and accessor methods. Author: Prateek Maheshwari <pmaheshw...@apache.org> Reviewers: Jagadish<jagad...@apache.org> Closes #840 from prateekm/fix-config-names Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0dc9dd24 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0dc9dd24 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0dc9dd24 Branch: refs/heads/master Commit: 0dc9dd24f8839cf5b261b2b42edb27d0b01b162c Parents: 8a22e05 Author: Prateek Maheshwari <pmaheshw...@apache.org> Authored: Tue Dec 4 14:08:57 2018 -0800 Committer: Jagadish <jvenkatra...@linkedin.com> Committed: Tue Dec 4 14:08:57 2018 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/config/TaskConfig.scala | 12 ++++++------ .../org/apache/samza/container/SamzaContainer.scala | 11 ++--------- .../apache/samza/system/kafka/KafkaSystemFactory.scala | 2 +- .../system/kafka_deprecated/KafkaSystemFactory.scala | 2 +- .../org/apache/samza/logging/log4j/StreamAppender.java | 2 +- .../org/apache/samza/logging/log4j2/StreamAppender.java | 2 +- .../test/processor/TestZkLocalApplicationRunner.java | 2 +- 7 files changed, 13 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index a64589f..ba5d932 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -36,9 +36,9 @@ object TaskConfig { val LIFECYCLE_LISTENER = "task.lifecycle.listener.%s.class" // task.lifecycle.listener.li-generator.class val CHECKPOINT_MANAGER_FACTORY = TaskConfigJava.CHECKPOINT_MANAGER_FACTORY // class name to use when sending offset checkpoints val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class" - val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails - val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails - val DROP_PRODUCER_ERROR = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send + val DROP_DESERIALIZATION_ERRORS = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails + val DROP_SERIALIZATION_ERRORS = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails + val DROP_PRODUCER_ERRORS = "task.drop.producer.errors" // whether to ignore producer errors and drop the messages that failed to send val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window val GROUPER_FACTORY = "task.name.grouper.factory" // class name for task grouper val MAX_CONCURRENCY = "task.max.concurrency" // max number of concurrent process for a AsyncStreamTask @@ -115,11 +115,11 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getMessageChooserClass = getOption(TaskConfig.MESSAGE_CHOOSER_CLASS_NAME) - def getDropDeserialization = getOption(TaskConfig.DROP_DESERIALIZATION_ERROR) + def getDropDeserializationErrors = getBoolean(TaskConfig.DROP_DESERIALIZATION_ERRORS, false) - def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR) + def getDropSerializationErrors = getBoolean(TaskConfig.DROP_SERIALIZATION_ERRORS, false) - def getDropProducerError = getBoolean(TaskConfig.DROP_PRODUCER_ERROR, false) + def getDropProducerErrors = getBoolean(TaskConfig.DROP_PRODUCER_ERRORS, false) def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS) http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 94bc138..03effe6 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -425,15 +425,8 @@ object SamzaContainer extends Logging { val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics) info("Got offset manager: %s" format offsetManager) - val dropDeserializationError = config.getDropDeserialization match { - case Some(dropError) => dropError.toBoolean - case _ => false - } - - val dropSerializationError = config.getDropSerialization match { - case Some(dropError) => dropError.toBoolean - case _ => false - } + val dropDeserializationError = config.getDropDeserializationErrors + val dropSerializationError = config.getDropSerializationErrors val pollIntervalMs = config .getPollIntervalMs http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala index f314f92..cca7a6b 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala @@ -83,7 +83,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs), getProducer, metrics, - dropProducerExceptions = config.getDropProducerError) + dropProducerExceptions = config.getDropProducerErrors) } def getAdmin(systemName: String, config: Config): SystemAdmin = { http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala index d588831..cf7c5e8 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka_deprecated/KafkaSystemFactory.scala @@ -101,7 +101,7 @@ class KafkaSystemFactory extends SystemFactory with Logging { new ExponentialSleepStrategy(initialDelayMs = producerConfig.reconnectIntervalMs), getProducer, metrics, - dropProducerExceptions = config.getDropProducerError) + dropProducerExceptions = config.getDropProducerErrors) } def getAdmin(systemName: String, config: Config): SystemAdmin = { http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java index 5278284..b4a97f7 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java @@ -279,7 +279,7 @@ public class StreamAppender extends AppenderSkeleton { throw new SamzaException("can not read the config", e); } // Make system producer drop producer errors for StreamAppender - config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true")); + config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true")); return config; } http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java ---------------------------------------------------------------------- diff --git a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java index e1d6dd3..28f759e 100644 --- a/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java +++ b/samza-log4j2/src/main/java/org/apache/samza/logging/log4j2/StreamAppender.java @@ -300,7 +300,7 @@ public class StreamAppender extends AbstractAppender { throw new SamzaException("can not read the config", e); } // Make system producer drop producer errors for StreamAppender - config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERROR(), "true")); + config = new MapConfig(config, ImmutableMap.of(TaskConfig.DROP_PRODUCER_ERRORS(), "true")); return config; } http://git-wip-us.apache.org/repos/asf/samza/blob/0dc9dd24/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java index 7411318..78dad0d 100644 --- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java +++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java @@ -206,7 +206,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne .put(JobConfig.JOB_NAME(), appName) .put(JobConfig.JOB_ID(), appId) .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS) - .put(TaskConfig.DROP_PRODUCER_ERROR(), "true") + .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true") .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS) .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000") .build();