Updated Branches: refs/heads/master 8ee3abb64 -> 4dec9e736
SAMZA-115; allowing custom metric group name in blocking envelope map. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4dec9e73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4dec9e73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4dec9e73 Branch: refs/heads/master Commit: 4dec9e736cf6dbff48ab80fc48d8a6de1d6bc7c3 Parents: 8ee3abb Author: Chris Riccomini <[email protected]> Authored: Tue Jan 7 09:03:06 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Tue Jan 7 09:03:06 2014 -0800 ---------------------------------------------------------------------- .../main/java/org/apache/samza/util/BlockingEnvelopeMap.java | 7 ++++++- .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4dec9e73/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index d4db126..a7c32bc 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -76,7 +76,12 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { } public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) { - this.metrics = new BlockingEnvelopeMapMetrics(this.getClass().getName(), metricsRegistry); + this(metricsRegistry, clock, null); + } + + public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) { + metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName; + this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry); this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>(); this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>(); this.clock = clock; http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4dec9e73/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala index 5dbcd94..3ee4068 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala @@ -59,7 +59,7 @@ private[kafka] class KafkaSystemConsumer( keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]], clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock { def currentTimeMillis = clock() -}) with Toss with Logging { +}, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging { type HostPort = (String, Int) val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
