Updated Branches: refs/heads/master fe8df6373 -> 48b946c0d
SAMZA-134; container/stream-level metrics of messages per second Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/48b946c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/48b946c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/48b946c0 Branch: refs/heads/master Commit: 48b946c0d1470c51ea61ea5d509c1b35c949752c Parents: fe8df63 Author: Chris Riccomini <[email protected]> Authored: Thu Jan 23 13:59:34 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Thu Jan 23 13:59:34 2014 -0800 ---------------------------------------------------------------------- .../main/scala/org/apache/samza/system/SystemConsumers.scala | 3 +++ .../org/apache/samza/system/SystemConsumersMetrics.scala | 5 +++++ .../scala/org/apache/samza/system/kafka/BrokerProxy.scala | 2 +- .../samza/system/kafka/KafkaSystemConsumerMetrics.scala | 8 ++++---- .../org/apache/samza/system/kafka/TestBrokerProxy.scala | 2 +- 5 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala index dd7d357..cdba7fe 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala @@ -168,6 +168,7 @@ class SystemConsumers( def register(systemStreamPartition: SystemStreamPartition, lastReadOffset: String) { debug("Registering stream: %s, %s" format (systemStreamPartition, lastReadOffset)) + metrics.registerSystemStream(systemStreamPartition.getSystemStream) neededByChooser += systemStreamPartition updateFetchMap(systemStreamPartition, maxMsgsPerStreamPartition) unprocessedMessages += systemStreamPartition -> Queue[IncomingMessageEnvelope]() @@ -195,6 +196,8 @@ class SystemConsumers( // Ok to give the chooser a new message from this stream. neededByChooser += envelopeFromChooser.getSystemStreamPartition + + metrics.systemStreamMessagesChosen(envelopeFromChooser.getSystemStreamPartition.getSystemStream).inc } refresh http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala index 9b3160d..d632314 100644 --- a/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/system/SystemConsumersMetrics.scala @@ -30,6 +30,7 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry val systemPolls = scala.collection.mutable.Map[String, Counter]() val systemStreamPartitionFetchesPerPoll = scala.collection.mutable.Map[String, Counter]() val systemMessagesPerPoll = scala.collection.mutable.Map[String, Counter]() + val systemStreamMessagesChosen = scala.collection.mutable.Map[SystemStream, Counter]() def setUnprocessedMessages(getValue: () => Int) { newGauge("unprocessed-messages", getValue) @@ -58,4 +59,8 @@ class SystemConsumersMetrics(val registry: MetricsRegistry = new MetricsRegistry systemMessagesPerPoll += systemName -> newCounter("%s-messages-per-poll" format systemName) } } + + def registerSystemStream(systemStream: SystemStream) { + systemStreamMessagesChosen += systemStream -> newCounter("%s-%s-messages-chosen" format (systemStream.getSystem, systemStream.getStream)) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala index 5e3b7cb..89730db 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala @@ -148,7 +148,7 @@ abstract class BrokerProxy( } else { debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions)) - metrics.brokerSkippedReads(host, port).inc + metrics.brokerSkippedFetchRequests(host, port).inc Thread.sleep(sleepMSWhileNoTopicPartitions) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala index 143be68..cf0dd22 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala @@ -40,14 +40,14 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr val reconnects = new ConcurrentHashMap[(String, Int), Counter] val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter] val brokerReads = new ConcurrentHashMap[(String, Int), Counter] - val brokerSkippedReads = new ConcurrentHashMap[(String, Int), Counter] + val brokerSkippedFetchRequests = new ConcurrentHashMap[(String, Int), Counter] val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]] def registerTopicAndPartition(tp: TopicAndPartition) = { if (!offsets.contains(tp)) { offsets.put(tp, newCounter("%s-%s-offset-change" format (tp.topic, tp.partition))) bytesRead.put(tp, newCounter("%s-%s-bytes-read" format (tp.topic, tp.partition))) - reads.put(tp, newCounter("%s-%s-reads" format (tp.topic, tp.partition))) + reads.put(tp, newCounter("%s-%s-messages-read" format (tp.topic, tp.partition))) lag.put(tp, newGauge("%s-%s-messages-behind-high-watermark" format (tp.topic, tp.partition), 0L)) } } @@ -55,8 +55,8 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr def registerBrokerProxy(host: String, port: Int) { reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port))) brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port))) - brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, port))) - brokerSkippedReads.put((host, port), newCounter("%s-%s-skipped-reads" format (host, port))) + brokerReads.put((host, port), newCounter("%s-%s-messages-read" format (host, port))) + brokerSkippedFetchRequests.put((host, port), newCounter("%s-%s-skipped-fetch-requests" format (host, port))) topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0)) } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/48b946c0/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala index 36445df..bcd8998 100644 --- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala +++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala @@ -177,7 +177,7 @@ class TestBrokerProxy extends Logging { bp.addTopicPartition(tp2, Option("0")) Thread.sleep(1000) assertEquals(0, sink.receivedMessages.size) - assertTrue(bp.metrics.brokerSkippedReads(bp.host, bp.port).getCount > 0) + assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, bp.port).getCount > 0) assertTrue(bp.metrics.brokerReads(bp.host, bp.port).getCount == 0) }
