Repository: incubator-samza Updated Branches: refs/heads/master 811f2897c -> 3e0b3a2b6
SAMZA-407; add ability to drop exceptions rather than fail container and add metrics to track Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/3e0b3a2b Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/3e0b3a2b Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/3e0b3a2b Branch: refs/heads/master Commit: 3e0b3a2b6c146b8fab13aaf4b6798a696b2e52f5 Parents: 811f289 Author: David Chen <[email protected]> Authored: Wed Sep 17 15:24:12 2014 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Sep 17 15:24:12 2014 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 10 ++ .../java/org/apache/samza/metrics/Gauge.java | 2 +- .../org/apache/samza/config/TaskConfig.scala | 3 + .../apache/samza/container/SamzaContainer.scala | 15 +- .../apache/samza/container/TaskInstance.scala | 27 +-- .../TaskInstanceExceptionHandler.scala | 103 ++++++++++++ .../apache/samza/container/TestRunLoop.scala | 2 +- .../samza/container/TestSamzaContainer.scala | 4 +- .../samza/container/TestTaskInstance.scala | 163 ++++++++++++++++++- .../samza/metrics/TestMetricsHelper.scala | 2 +- 10 files changed, 308 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 526ca9f..069babe 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -403,6 +403,16 @@ </tr> <tr> + <td class="property" id="task-ignored-exceptions">task.ignored.exceptions</td> + <td class="default"></td> + <td class="description"> + This property specifies which exceptions should be ignored if thrown in a task's <code>process</code> or <code>window</code> + methods. The exceptions to be ignored should be a comma-separated list of fully-qualified class names of the exceptions or + <code>*</code> to ignore all exceptions. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th> </tr> http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java index c37bfbb..be1f01d 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicReference; /** * A Gauge is a {@link org.apache.samza.metrics.Metric} that wraps some instance of T in a thread-safe - * reference and allows it to be set or retrieved. Gauages record specific values over time. + * reference and allows it to be set or retrieved. Gauges record specific values over time. * For example, the current length of a queue or the size of a buffer. * * @param <T> Instance to be wrapped in the gauge for metering. http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 21d8903..d066ed8 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -35,6 +35,7 @@ object TaskConfig { val MESSAGE_CHOOSER_CLASS_NAME = "task.chooser.class" val DROP_DESERIALIZATION_ERROR = "task.drop.deserialization.errors" // define whether drop the messages or not when deserialization fails val DROP_SERIALIZATION_ERROR = "task.drop.serialization.errors" // define whether drop the messages or not when serialization fails + val IGNORED_EXCEPTIONS = "task.ignored.exceptions" // exceptions to ignore in process and window /** * Samza's container polls for more messages under two conditions. The first @@ -95,4 +96,6 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) { def getDropSerialization = getOption(TaskConfig.DROP_SERIALIZATION_ERROR) def getPollIntervalMs = getOption(TaskConfig.POLL_INTERVAL_MS) + + def getIgnoredExceptions = getOption(TaskConfig.IGNORED_EXCEPTIONS) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 3288bf7..d91d6d7 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -19,8 +19,8 @@ package org.apache.samza.container -import org.apache.samza.util.Logging import java.io.File + import org.apache.samza.Partition import org.apache.samza.SamzaException import org.apache.samza.checkpoint.{CheckpointManagerFactory, OffsetManager} @@ -56,11 +56,13 @@ import org.apache.samza.system.chooser.DefaultChooser import org.apache.samza.system.chooser.MessageChooserFactory import org.apache.samza.system.chooser.RoundRobinChooserFactory import org.apache.samza.task.StreamTask +import org.apache.samza.task.TaskInstanceCollector import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.TaskLifecycleListenerFactory +import org.apache.samza.util.Logging import org.apache.samza.util.Util + import scala.collection.JavaConversions._ -import org.apache.samza.task.TaskInstanceCollector object SamzaContainer extends Logging { @@ -86,8 +88,8 @@ object SamzaContainer extends Logging { } def safeMain(jmxServer: JmxServer = new JmxServer) { - // Break out the main method to make the JmxServer injectable so we can - // validate that we don't leak JMX non-daemon threads if we have an + // Break out the main method to make the JmxServer injectable so we can + // validate that we don't leak JMX non-daemon threads if we have an // exception in the main method. try { val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME) @@ -374,7 +376,7 @@ object SamzaContainer extends Logging { } } - // TODO not sure how we should make this config based, or not. Kind of + // TODO not sure how we should make this config based, or not. Kind of // strange, since it has some dynamic directories when used with YARN. val storeBaseDir = new File(System.getProperty("user.dir"), "state") @@ -492,7 +494,8 @@ object SamzaContainer extends Logging { storageManager = storageManager, reporters = reporters, listeners = listeners, - systemStreamPartitions = systemStreamPartitions) + systemStreamPartitions = systemStreamPartitions, + exceptionHandler = TaskInstanceExceptionHandler(taskInstanceMetrics, config)) (taskName, taskInstance) }).toMap http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala index b86fb0d..66f7dbe 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala @@ -19,24 +19,26 @@ package org.apache.samza.container -import org.apache.samza.metrics.MetricsReporter +import org.apache.samza.SamzaException +import org.apache.samza.checkpoint.OffsetManager import org.apache.samza.config.Config -import org.apache.samza.util.Logging +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.metrics.MetricsReporter import org.apache.samza.storage.TaskStorageManager +import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemStreamPartition +import org.apache.samza.system.SystemConsumers import org.apache.samza.task.TaskContext import org.apache.samza.task.ClosableTask import org.apache.samza.task.InitableTask -import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.task.WindowableTask import org.apache.samza.task.TaskLifecycleListener import org.apache.samza.task.StreamTask -import org.apache.samza.system.SystemConsumers import org.apache.samza.task.ReadableCoordinator -import org.apache.samza.checkpoint.OffsetManager -import org.apache.samza.SamzaException -import scala.collection.JavaConversions._ import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.util.Logging + +import scala.collection.JavaConversions._ class TaskInstance( task: StreamTask, @@ -49,7 +51,8 @@ class TaskInstance( storageManager: TaskStorageManager = null, reporters: Map[String, MetricsReporter] = Map(), listeners: Seq[TaskLifecycleListener] = Seq(), - val systemStreamPartitions: Set[SystemStreamPartition] = Set()) extends Logging { + val systemStreamPartitions: Set[SystemStreamPartition] = Set(), + val exceptionHandler: TaskInstanceExceptionHandler = new TaskInstanceExceptionHandler) extends Logging { val isInitableTask = task.isInstanceOf[InitableTask] val isWindowableTask = task.isInstanceOf[WindowableTask] val isClosableTask = task.isInstanceOf[ClosableTask] @@ -130,7 +133,9 @@ class TaskInstance( trace("Processing incoming message envelope for taskName and SSP: %s, %s" format (taskName, envelope.getSystemStreamPartition)) - task.process(envelope, collector, coordinator) + exceptionHandler.maybeHandle { + task.process(envelope, collector, coordinator) + } listeners.foreach(_.afterProcess(envelope, config, context)) @@ -145,7 +150,9 @@ class TaskInstance( metrics.windows.inc - task.asInstanceOf[WindowableTask].window(collector, coordinator) + exceptionHandler.maybeHandle { + task.asInstanceOf[WindowableTask].window(collector, coordinator) + } } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala new file mode 100644 index 0000000..99b729f --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceExceptionHandler.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.container + +import org.apache.samza.config.Config +import org.apache.samza.config.TaskConfig.Config2Task +import org.apache.samza.metrics.Counter +import org.apache.samza.metrics.MetricsHelper +import org.apache.samza.util.Logging + +/** + * Handles exceptions thrown in a {@link TaskInstance}'s process or window + * methods and provides metrics on the number of times ignored exceptions are + * thrown. The exceptions to ignore are specified using the + * `task.ignored.exceptions` configuration property. + * + * @param metrics The {@link TaskInstanceMetrics} used to track exception + * counts. + * @param ignoredExceptions Set of string names of exception classes to ignore + * and count. If the set contains the wildcard "*", then all exceptions + * are ignored and counted. + */ +class TaskInstanceExceptionHandler( + val metrics: MetricsHelper = new TaskInstanceMetrics, + val ignoredExceptions: Set[String] = Set[String]()) extends Logging { + + val ignoreAll: Boolean = ignoredExceptions.contains("*") + var counters: Map[String, Counter] = Map[String, Counter]() + + /** + * Takes a code block and handles any exception thrown in the code block. + * + * @param tryCodeBlock The code block to run and handle exceptions from. + */ + def maybeHandle(tryCodeBlock: => Unit) { + try { + tryCodeBlock + } catch { + case e: Exception => handle(e) + } + } + + /** + * Handles an exception. If the exception is in the set of exceptions to + * ignore or if the wildcard is used to ignore all exceptions, then the + * exception is counted and then ignored. Otherwise, the exception is thrown. + * + * @param exception The exception to handle. + */ + def handle(exception: Exception) { + val className = exception.getClass.getName + if (!ignoreAll && !ignoredExceptions.contains(className)) { + throw exception + } + + debug("Counting exception " + className) + + counters.get(className) match { + case Some(counter) => counter.inc() + case _ => { + val counter = metrics.newCounter("exception-ignored-" + className) + counter.inc() + counters += className -> counter + } + } + } +} + +object TaskInstanceExceptionHandler { + /** + * Creates a new TaskInstanceExceptionHandler using the provided + * configuration. + * + * @param metrics The {@link TaskInstanceMetrics} used to track exception + * counts. + * @param config The configuration to read the list of ignored exceptions + * from. + */ + def apply(metrics: MetricsHelper, config: Config) = + new TaskInstanceExceptionHandler( + metrics = metrics, + ignoredExceptions = config.getIgnoredExceptions match { + case Some(exceptions) => exceptions.split(",").toSet + case _ => Set[String]() + }) +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala index ff425da..ea48853 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala @@ -211,4 +211,4 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche testMetrics.processMs.getSnapshot.getSize should equal(2) testMetrics.commitMs.getSnapshot.getSize should equal(2) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index b7a9569..c200601 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -56,7 +56,7 @@ class TestSamzaContainer extends AssertionsForJUnit { } } intercept[Exception] { - // Calling main will trigger an NPE since the container checks for an + // Calling main will trigger an NPE since the container checks for an // isCompressed environment variable, which isn't set. SamzaContainer.safeMain(jmxServer) } @@ -128,7 +128,7 @@ class TestSamzaContainer extends AssertionsForJUnit { container.run fail("Expected exception to be thrown in run method.") } catch { - case e: Exception => // Expected + case e: Exception => // Expected } assertTrue(task.wasShutdown) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala index c31a74e..11eab16 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala @@ -19,11 +19,16 @@ package org.apache.samza.container -import org.junit.Assert._ -import org.junit.Test +import java.util.concurrent.ConcurrentHashMap + +import org.apache.samza.SamzaException import org.apache.samza.Partition import org.apache.samza.checkpoint.OffsetManager +import org.apache.samza.config.Config import org.apache.samza.config.MapConfig +import org.apache.samza.metrics.Counter +import org.apache.samza.metrics.Metric +import org.apache.samza.metrics.MetricsRegistryMap import org.apache.samza.serializers.SerdeManager import org.apache.samza.system.IncomingMessageEnvelope import org.apache.samza.system.SystemConsumer @@ -40,6 +45,10 @@ import org.apache.samza.task.ReadableCoordinator import org.apache.samza.task.StreamTask import org.apache.samza.task.TaskCoordinator import org.apache.samza.task.TaskInstanceCollector +import org.apache.samza.task.WindowableTask +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.Assertions.intercept import scala.collection.JavaConversions._ @@ -81,4 +90,154 @@ class TestTaskInstance { assertTrue(lastProcessedOffset.isDefined) assertEquals("2", lastProcessedOffset.get) } + + /** + * Mock exception used to test exception counts metrics. + */ + class TroublesomeException extends RuntimeException { + } + + /** + * Mock exception used to test exception counts metrics. + */ + class NonFatalException extends RuntimeException { + } + + /** + * Mock exception used to test exception counts metrics. + */ + class FatalException extends RuntimeException { + } + + /** + * Task used to test exception counts metrics. + */ + class TroublesomeTask extends StreamTask with WindowableTask { + def process( + envelope: IncomingMessageEnvelope, + collector: MessageCollector, + coordinator: TaskCoordinator) { + + envelope.getOffset().toInt match { + case offset if offset % 2 == 0 => throw new TroublesomeException + case _ => throw new NonFatalException + } + } + + def window(collector: MessageCollector, coordinator: TaskCoordinator) { + throw new FatalException + } + } + + /* + * Helper method used to retrieve the value of a counter from a group. + */ + private def getCount( + group: ConcurrentHashMap[String, Metric], + name: String): Long = { + group.get("exception-ignored-" + name.toLowerCase).asInstanceOf[Counter].getCount + } + + /** + * Test task instance exception metrics with two ignored exceptions and one + * exception not ignored. + */ + @Test + def testExceptionCounts { + val task = new TroublesomeTask + val ignoredExceptions = classOf[TroublesomeException].getName + "," + + classOf[NonFatalException].getName + val config = new MapConfig(Map[String, String]( + "task.ignored.exceptions" -> ignoredExceptions)) + + val partition = new Partition(0) + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val systemStream = new SystemStream("test-system", "test-stream") + val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + // Pretend our last checkpointed (next) offset was 2. + val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) + val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) + val taskName = new TaskName("taskName") + val collector = new TaskInstanceCollector(producerMultiplexer) + + val registry = new MetricsRegistryMap + val taskMetrics = new TaskInstanceMetrics(registry = registry) + val taskInstance: TaskInstance = new TaskInstance( + task, + taskName, + config, + taskMetrics, + consumerMultiplexer, + collector, + offsetManager, + exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config)) + + val coordinator = new ReadableCoordinator(taskName) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator) + + val group = registry.getGroup(taskMetrics.group) + assertEquals(1L, getCount(group, classOf[TroublesomeException].getName)) + assertEquals(2L, getCount(group, classOf[NonFatalException].getName)) + + intercept[FatalException] { + taskInstance.window(coordinator) + } + assertFalse(group.contains(classOf[FatalException].getName.toLowerCase)) + } + + /** + * Test task instance exception metrics with all exception ignored using a + * wildcard. + */ + @Test + def testIgnoreAllExceptions { + val task = new TroublesomeTask + val config = new MapConfig(Map[String, String]( + "task.ignored.exceptions" -> "*")) + + val partition = new Partition(0) + val consumerMultiplexer = new SystemConsumers( + new RoundRobinChooser, + Map[String, SystemConsumer]()) + val producerMultiplexer = new SystemProducers( + Map[String, SystemProducer](), + new SerdeManager) + val systemStream = new SystemStream("test-system", "test-stream") + val systemStreamPartition = new SystemStreamPartition(systemStream, partition) + // Pretend our last checkpointed (next) offset was 2. + val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2"))) + val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config) + val taskName = new TaskName("taskName") + val collector = new TaskInstanceCollector(producerMultiplexer) + + val registry = new MetricsRegistryMap + val taskMetrics = new TaskInstanceMetrics(registry = registry) + val taskInstance: TaskInstance = new TaskInstance( + task, + taskName, + config, + taskMetrics, + consumerMultiplexer, + collector, + offsetManager, + exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config)) + + val coordinator = new ReadableCoordinator(taskName) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator) + taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator) + taskInstance.window(coordinator) + + val group = registry.getGroup(taskMetrics.group) + assertEquals(1L, getCount(group, classOf[TroublesomeException].getName)) + assertEquals(2L, getCount(group, classOf[NonFatalException].getName)) + assertEquals(1L, getCount(group, classOf[FatalException].getName)) + } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/3e0b3a2b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala index 0a62bd0..bd3e5fe 100644 --- a/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala +++ b/samza-core/src/test/scala/org/apache/samza/metrics/TestMetricsHelper.scala @@ -25,7 +25,7 @@ import org.apache.samza.container.SamzaContainerMetrics class TestMetricsHelper { @Test - def testMetricsHelperGroupShouldBePAckageName { + def testMetricsHelperGroupShouldBePackageName { assertEquals(classOf[SamzaContainerMetrics].getName, new SamzaContainerMetrics().group) } }
