Updated Branches:
  refs/heads/master fe8df6373 -> 48b946c0d

SAMZA-134; container/stream-level metrics of messages per second


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/48b946c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/48b946c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/48b946c0

Branch: refs/heads/master
Commit: 48b946c0d1470c51ea61ea5d509c1b35c949752c
Parents: fe8df63
Author: Chris Riccomini <[email protected]>
Authored: Thu Jan 23 13:59:34 2014 -0800
Committer: Chris Riccomini <[email protected]>
Committed: Thu Jan 23 13:59:34 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/system/SystemConsumers.scala | 3 +++
 .../org/apache/samza/system/SystemConsumersMetrics.scala     | 5 +++++
 .../scala/org/apache/samza/system/kafka/BrokerProxy.scala    | 2 +-
 .../samza/system/kafka/KafkaSystemConsumerMetrics.scala      | 8 ++++----
 .../org/apache/samza/system/kafka/TestBrokerProxy.scala      | 2 +-
 5 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
index dd7d357..cdba7fe 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
@@ -168,6 +168,7 @@ class SystemConsumers(
   def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: 
String) {
     debug("Registering stream: %s, %s" format (systemStreamPartition, 
lastReadOffset))
 
+    metrics.registerSystemStream(systemStreamPartition.getSystemStream)
     neededByChooser += systemStreamPartition
     updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition)
     unprocessedMessages += systemStreamPartition -> 
Queue[IncomingMessageEnvelope]()
@@ -195,6 +196,8 @@ class SystemConsumers(
 
       // Ok to give the chooser a new message from this stream.
       neededByChooser += envelopeFromChooser.getSystemStreamPartition
+
+      
metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc
     }
 
     refresh

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
index 9b3160d..d632314 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala
@@ -30,6 +30,7 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = 
new MetricsRegistry
   val systemPolls = scala.collection.mutable.Map[String, Counter]()
   val systemStreamPartitionFetchesPerPoll = 
scala.collection.mutable.Map[String, Counter]()
   val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]()
+  val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, 
Counter]()
 
   def setUnprocessedMessages(getValue: () => Int) {
     newGauge("unprocessed-messages", getValue)
@@ -58,4 +59,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = 
new MetricsRegistry
       systemMessagesPerPoll += systemName -> newCounter("%s-messages-per-poll" 
format systemName)
     }
   }
+
+  def registerSystemStream(systemStream: SystemStream) {
+    systemStreamMessagesChosen += systemStream -> 
newCounter("%s-%s-messages-chosen" format (systemStream.getSystem, 
systemStream.getStream))
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 5e3b7cb..89730db 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -148,7 +148,7 @@ abstract class BrokerProxy(
     } else {
       debug("No topic/partitions need to be fetched for %s:%s right now. 
Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
 
-      metrics.brokerSkippedReads(host, port).inc
+      metrics.brokerSkippedFetchRequests(host, port).inc
 
       Thread.sleep(sleepMSWhileNoTopicPartitions)
     }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 143be68..cf0dd22 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -40,14 +40,14 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
   val reconnects = new ConcurrentHashMap[(String, Int), Counter]
   val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
   val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
-  val brokerSkippedReads = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), 
Counter]
   val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
 
   def registerTopicAndPartition(tp: TopicAndPartition) = {
     if (!offsets.contains(tp)) {
       offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, 
tp.partition)))
       bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, 
tp.partition)))
-      reads.put(tp, newCounter("%s-%s-reads" format (tp.topic, tp.partition)))
+      reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, 
tp.partition)))
       lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format 
(tp.topic, tp.partition), 0L))
     }
   }
@@ -55,8 +55,8 @@ class KafkaSystemConsumerMetrics(val systemName: String = 
"unknown", val registr
   def registerBrokerProxy(host: String, port: Int) {
     reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, 
port)))
     brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format 
(host, port)))
-    brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, 
port)))
-    brokerSkippedReads.put((host, port), newCounter("%s-%s-skipped-reads" 
format (host, port)))
+    brokerReads.put((host, port), newCounter("%s-%s-messages-read" format 
(host, port)))
+    brokerSkippedFetchRequests.put((host, port), 
newCounter("%s-%s-skipped-fetch-requests" format (host, port)))
     topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format 
(host, port), 0))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 36445df..bcd8998 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -177,7 +177,7 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedReads(bp.host, bp.port).getCount > 0)
+    assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, 
bp.port).getCount > 0)
     assertTrue(bp.metrics.brokerReads(bp.host, bp.port).getCount == 0)
   }
 

Reply via email to