Updated Branches: refs/heads/master 15c4587b4 -> afc2fb7af
SAMZA-96; add names to daemon threads for JvmMetrics and MetricsSnapshotReporter Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/afc2fb7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/afc2fb7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/afc2fb7a Branch: refs/heads/master Commit: afc2fb7af974aa7780f8ea57a202fe7202780548 Parents: 15c4587 Author: Steve Yates <[email protected]> Authored: Thu Feb 6 09:51:33 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Feb 6 09:51:33 2014 -0800 ---------------------------------------------------------------------- .../org/apache/samza/metrics/JvmMetrics.scala | 10 ++++-- .../reporter/MetricsSnapshotReporter.scala | 9 ++++- .../apache/samza/util/DaemonThreadFactory.scala | 11 +++++- .../samza/util/TestDaemonThreadFactory.scala | 36 ++++++++++++++++++++ .../apache/samza/system/kafka/BrokerProxy.scala | 9 +++++ 5 files changed, 71 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala index 301a5a0..ed1e8af 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/JvmMetrics.scala @@ -26,10 +26,16 @@ import java.lang.Thread.State._ import java.util.concurrent.Executors import java.util.concurrent.TimeUnit import grizzled.slf4j.Logging -import org.apache.samza.util.Util import org.apache.samza.util.DaemonThreadFactory /** + * Companion object for class JvmMetrics encapsulating various constants + */ +object JvmMetrics { + val JVM_METRICS_THREAD_NAME_PREFIX = "JVM-METRICS" +} + +/** * Straight up ripoff of Hadoop's metrics2 JvmMetrics class. */ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runnable with Logging { @@ -39,7 +45,7 @@ class JvmMetrics(val registry: MetricsRegistry) extends MetricsHelper with Runna val gcBeans = ManagementFactory.getGarbageCollectorMXBeans() val threadMXBean = ManagementFactory.getThreadMXBean() var gcBeanCounters = Map[String, (Counter, Counter)]() - val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory) + val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(JvmMetrics.JVM_METRICS_THREAD_NAME_PREFIX)) // jvm metrics val gMemNonHeapUsedM = newGauge("mem-non-heap-used-mb", 0.0F) http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala index 15af7aa..9a56754 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/reporter/MetricsSnapshotReporter.scala @@ -37,6 +37,13 @@ import org.apache.samza.system.SystemStream import org.apache.samza.system.OutgoingMessageEnvelope /** + * Companion object for class MetricsSnapshotReporter encapsulating various constants + */ +object MetricsSnapshotReporter { + val METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX = "METRIC-SNAPSHOT-REPORTER" +} + +/** * MetricsSnapshotReporter is a generic metrics reporter that sends metrics to a stream. * * jobName // my-samza-job @@ -57,7 +64,7 @@ class MetricsSnapshotReporter( serializer: Serializer[MetricsSnapshot] = null, clock: () => Long = () => { System.currentTimeMillis }) extends MetricsReporter with Runnable with Logging { - val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory) + val executor = Executors.newScheduledThreadPool(1, new DaemonThreadFactory(MetricsSnapshotReporter.METRIC_SNAPSHOT_REPORTER_THREAD_NAME_PREFIX)) val resetTime = clock() var registries = List[(String, ReadableMetricsRegistry)]() http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala index 04e67a2..d2015ab 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/DaemonThreadFactory.scala @@ -21,10 +21,19 @@ package org.apache.samza.util import java.util.concurrent.ThreadFactory -class DaemonThreadFactory extends ThreadFactory { + +object ThreadNamePrefix { + val SAMZA_THREAD_NAME_PREFIX = "SAMZA-" +} + +class DaemonThreadFactory(name: String) extends ThreadFactory { + def newThread(r: Runnable) = { val thread = new Thread(r) thread.setDaemon(true) + if (name.nonEmpty) { + thread.setName(ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+name) + } thread } } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala new file mode 100644 index 0000000..6353378 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/util/TestDaemonThreadFactory.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util + +import org.junit.Assert._ +import org.junit.Test + +class TestDaemonThreadFactory { + @Test + def testDaemonThreadFactoryCanCreatThreadGivenName() { + val testThreadName = "JvmMetrics" + val dtf = new DaemonThreadFactory(testThreadName) + val threadWithName = dtf.newThread(new Runnable { + def run() { + //Not testing this particular method + } + }) + assertEquals(threadWithName.getName, ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX+testThreadName) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/afc2fb7a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 89730db..5095e70 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -31,6 +31,14 @@ import java.nio.channels.ClosedByInterruptException import java.util.Map.Entry import scala.collection.mutable import kafka.consumer.ConsumerConfig +import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX + +/** + * Companion object for class JvmMetrics encapsulating various constants + */ +object BrokerProxy { + val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY" +} /** * A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing @@ -238,6 +246,7 @@ abstract class BrokerProxy( info("Starting " + toString) thread.setDaemon(true) + thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX) thread.start }
