Updated Branches:
  refs/heads/master 8ee3abb64 -> 4dec9e736

SAMZA-115; allowing custom metric group name in blocking envelope map.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4dec9e73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4dec9e73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4dec9e73

Branch: refs/heads/master
Commit: 4dec9e736cf6dbff48ab80fc48d8a6de1d6bc7c3
Parents: 8ee3abb
Author: Chris Riccomini <[email protected]>
Authored: Tue Jan 7 09:03:06 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Tue Jan 7 09:03:06 2014 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/samza/util/BlockingEnvelopeMap.java  | 7 ++++++-
 .../org/apache/samza/system/kafka/KafkaSystemConsumer.scala   | 2 +-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4dec9e73/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index d4db126..a7c32bc 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -76,7 +76,12 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
   }
 
   public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
-    this.metrics = new BlockingEnvelopeMapMetrics(this.getClass().getName(), 
metricsRegistry);
+    this(metricsRegistry, clock, null);
+  }
+
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, 
String metricsGroupName) {
+    metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() 
: metricsGroupName;
+    this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, 
metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, 
BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, 
Boolean>();
     this.clock = clock;

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4dec9e73/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 5dbcd94..3ee4068 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -59,7 +59,7 @@ private[kafka] class KafkaSystemConsumer(
   keyDeserializer: Decoder[Object] = new 
DefaultDecoder().asInstanceOf[Decoder[Object]],
   clock: () => Long = { System.currentTimeMillis }) extends 
BlockingEnvelopeMap(metrics.registry, new Clock {
   def currentTimeMillis = clock()
-}) with Toss with Logging {
+}, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()

Reply via email to