This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e1301b  SAMZA-2259: Change boolean gauges to integer
6e1301b is described below

commit 6e1301bd0c282500a436011335e5d58a5314802c
Author: Daniel Chen <dch...@linkedin.com>
AuthorDate: Mon Jun 24 10:00:30 2019 -0700

    SAMZA-2259: Change boolean gauges to integer
    
    Changing boolean gauges to integer since it is a more widely supported 
reporter type.
    
    prateekm Please take a look
    
    Author: Daniel Chen <dch...@linkedin.com>
    
    Reviewers: Prateek Maheshwari <pmaheshw...@apache.org>
    
    Closes #1088 from dxichen/samza-2259
---
 docs/learn/documentation/versioned/container/metrics-table.html   | 4 ++--
 docs/learn/documentation/versioned/operations/monitoring.md       | 2 +-
 .../src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java  | 8 ++++----
 .../src/main/java/org/apache/samza/zk/ZkJobCoordinator.java       | 4 ++--
 .../main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java    | 4 ++--
 5 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/docs/learn/documentation/versioned/container/metrics-table.html 
b/docs/learn/documentation/versioned/container/metrics-table.html
index f35d597..c895fca 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -555,7 +555,7 @@
     </tr>
     <tr>
         <td>no-more-messages-SystemStreamPartition [<span 
class="system">system</span>, <span class="stream">stream</span>, <span 
class="partition">partition</span>]</td>
-        <td>Indicates if kafka consumer is at head for particular 
partition</td>
+        <td>Indicates if kafka consumer is at head for particular partition. 1 
if it is caught up, 0 otherwise.</td>
     </tr>
     <tr>
         <td>blocking-poll-count-SystemStreamPartition [<span 
class="system">system</span>, <span class="stream">stream</span>, <span 
class="partition">partition</span>]</td>
@@ -938,7 +938,7 @@
     </tr>
     <tr>
         <td>is-leader</td>
-        <td>Denotes if the processor is a leader or not</td>
+        <td>Denotes if the processor is a leader or not. 1 if it is a leader, 
0 otherwise.</td>
     </tr>
     <tr>
         <td>barrier-creation</td>
diff --git a/docs/learn/documentation/versioned/operations/monitoring.md 
b/docs/learn/documentation/versioned/operations/monitoring.md
index bbad335..c11fbe0 100644
--- a/docs/learn/documentation/versioned/operations/monitoring.md
+++ b/docs/learn/documentation/versioned/operations/monitoring.md
@@ -478,7 +478,7 @@ All \<system\>, \<stream\>, \<partition\>, \<store-name\>, 
\<topic\>, are popula
 |   | \<system\>-<host\>-<port\>-skipped-fetch-requests | Number of times the 
fetchMessage method is called but no topic/partitions needed new messages. |
 |   | \<system\>-<host\>-<port\>-topic-partitions | Number of broker&#39;s 
topic partitions which are being consumed. |
 |   | poll-count | Number of polls the KafkaSystemConsumer performed to get 
new messages. |
-|   | no-more-messages-SystemStreamPartition [\<system\>, \<stream\>, 
\<partition\>] | Indicates if the Kafka consumer is at the head for particular 
partition. |
+|   | no-more-messages-SystemStreamPartition [\<system\>, \<stream\>, 
\<partition\>] | Indicates if the Kafka consumer is at the head for particular 
partition. 1 if it is caught up, 0 otherwise. |
 |   | blocking-poll-count-SystemStreamPartition [\<system\>, \<stream\>, 
\<partition\>] | Number of times a blocking poll is executed (polling until we 
get at least one message, or until we catch up to the head of the stream) (per 
partition). |
 |   | blocking-poll-timeout-count-SystemStreamPartition [\<system\>, 
\<stream\>, \<partition\>] | Number of times a blocking poll has timed out 
(polling until we get at least one message within a timeout period) (per 
partition). |
 |   | buffered-message-count-SystemStreamPartition [\<system\>, \<stream\>, 
\<partition\>] | Current number of messages in queue (per partition). |
diff --git 
a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java 
b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 79f340f..8b792b4 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -244,7 +244,7 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
   public class BlockingEnvelopeMapMetrics {
     private final String group;
     private final MetricsRegistry metricsRegistry;
-    private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>> 
noMoreMessageGaugeMap;
+    private final ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>> 
noMoreMessageGaugeMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> 
blockingPollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> 
blockingPollTimeoutCountMap;
     private final Counter pollCount;
@@ -252,14 +252,14 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     public BlockingEnvelopeMapMetrics(String group, MetricsRegistry 
metricsRegistry) {
       this.group = group;
       this.metricsRegistry = metricsRegistry;
-      this.noMoreMessageGaugeMap = new 
ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
+      this.noMoreMessageGaugeMap = new 
ConcurrentHashMap<SystemStreamPartition, Gauge<Integer>>();
       this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, 
Counter>();
       this.blockingPollTimeoutCountMap = new 
ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.pollCount = metricsRegistry.newCounter(group, "poll-count");
     }
 
     public void initMetrics(SystemStreamPartition systemStreamPartition) {
-      this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.<Boolean>newGauge(group, "no-more-messages-" + 
systemStreamPartition, false));
+      this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.newGauge(group, "no-more-messages-" + systemStreamPartition, 
0));
       this.blockingPollCountMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.newCounter(group, "blocking-poll-count-" + 
systemStreamPartition));
       this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, 
metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + 
systemStreamPartition));
 
@@ -268,7 +268,7 @@ public abstract class BlockingEnvelopeMap implements 
SystemConsumer {
     }
 
     public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, 
boolean noMoreMessages) {
-      
this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages);
+      this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages 
? 1 : 0);
     }
 
     public void incBlockingPoll(SystemStreamPartition systemStreamPartition) {
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index b0b9836..bcf0f11 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -171,7 +171,7 @@ public class ZkJobCoordinator implements JobCoordinator {
 
       // Notify the metrics about abandoning the leadership. Moving it up the 
chain in the shutdown sequence so that
       // in case of unclean shutdown, we get notified about lack of leader and 
we can set up some alerts around the absence of leader.
-      metrics.isLeader.set(false);
+      metrics.isLeader.set(0);
 
       try {
         // todo: what does it mean for coordinator listener to be null? why 
not have it part of constructor?
@@ -424,7 +424,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     @Override
     public void onBecomingLeader() {
       LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader");
-      metrics.isLeader.set(true);
+      metrics.isLeader.set(1);
       zkUtils.subscribeToProcessorChange(new ProcessorChangeHandler(zkUtils));
       if (!new StorageConfig(config).hasDurableStores()) {
         // 1. Stop if there's a existing StreamPartitionCountMonitor running.
diff --git 
a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java 
b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
index 3d00897..e39e058 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
@@ -34,7 +34,7 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
   /**
    * Denotes if the processor is a leader or not
    */
-  public final Gauge<Boolean> isLeader;
+  public final Gauge<Integer> isLeader;
 
   /**
    * Number of times a barrier was created by the leader
@@ -60,7 +60,7 @@ public class ZkJobCoordinatorMetrics extends MetricsBase {
   public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) {
     super(metricsRegistry);
     this.metricsRegistry = metricsRegistry;
-    this.isLeader = newGauge("is-leader", false);
+    this.isLeader = newGauge("is-leader", 0);
     this.barrierCreation = newCounter("barrier-creation");
     this.barrierStateChange = newCounter("barrier-state-change");
     this.barrierError = newCounter("barrier-error");

Reply via email to