This is an automated email from the ASF dual-hosted git repository.
zhangmeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new d5f5273 Change participant message monitor to use dynamic metric
(#1685)
d5f5273 is described below
commit d5f5273d483ba54c51c79d497dead190cf758bb6
Author: Meng Zhang <[email protected]>
AuthorDate: Thu Mar 25 16:49:48 2021 -0700
Change participant message monitor to use dynamic metric (#1685)
---
.../mbeans/ParticipantMessageMonitor.java | 91 ++++++++++++----------
.../mbeans/ParticipantMessageMonitorMBean.java | 3 +-
.../mbeans/ParticipantStatusMonitor.java | 3 +-
3 files changed, 53 insertions(+), 44 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index 261790d..9c90295 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -19,10 +19,29 @@ package org.apache.helix.monitoring.mbeans;
* under the License.
*/
-public class ParticipantMessageMonitor implements
ParticipantMessageMonitorMBean {
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+
+public class ParticipantMessageMonitor extends DynamicMBeanProvider {
public static final String PARTICIPANT_KEY = "ParticipantName";
public static final String PARTICIPANT_STATUS_KEY =
"ParticipantMessageStatus";
+ // For registering dynamic metrics
+ private final ObjectName _initObjectName;
+ private final String _participantName;
+
+ private SimpleDynamicMetric<Long> _receivedMessages;
+ private SimpleDynamicMetric<Long> _discardedMessages;
+ private SimpleDynamicMetric<Long> _completedMessages;
+ private SimpleDynamicMetric<Long> _failedMessages;
+ private SimpleDynamicMetric<Long> _pendingMessages;
+
/**
* The current processed state of the message
*/
@@ -32,68 +51,42 @@ public class ParticipantMessageMonitor implements
ParticipantMessageMonitorMBean
COMPLETED
}
- private final String _participantName;
- private long _receivedMessages = 0;
- private long _discardedMessages = 0;
- private long _completedMessages = 0;
- private long _failedMessages = 0;
- private long _pendingMessages = 0;
-
- public ParticipantMessageMonitor(String participantName) {
+ public ParticipantMessageMonitor(String participantName, ObjectName
objectName) {
_participantName = participantName;
+ _initObjectName = objectName;
+ _receivedMessages = new SimpleDynamicMetric("ReceivedMessages", 0L);
+ _discardedMessages = new SimpleDynamicMetric("DiscardedMessages", 0L);
+ _completedMessages = new SimpleDynamicMetric("CompletedMessages", 0L);
+ _failedMessages = new SimpleDynamicMetric("FailedMessages", 0L);
+ _pendingMessages = new SimpleDynamicMetric("PendingMessages", 0L);
}
public String getParticipantBeanName() {
return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
}
- public void incrementReceivedMessages(int count) {
- _receivedMessages += count;
+ public void incrementReceivedMessages(long count) {
+ incrementSimpleDynamicMetric(_receivedMessages, count);
}
public void incrementDiscardedMessages(int count) {
- _discardedMessages += count;
+ incrementSimpleDynamicMetric(_discardedMessages, count);
}
public void incrementCompletedMessages(int count) {
- _completedMessages += count;
+ incrementSimpleDynamicMetric(_completedMessages, count);
}
public void incrementFailedMessages(int count) {
- _failedMessages += count;
+ incrementSimpleDynamicMetric(_failedMessages, count);
}
public void incrementPendingMessages(int count) {
- _pendingMessages += count;
+ incrementSimpleDynamicMetric(_pendingMessages, count);
}
public void decrementPendingMessages(int count) {
- _pendingMessages -= count;
- }
-
- @Override
- public long getReceivedMessages() {
- return _receivedMessages;
- }
-
- @Override
- public long getDiscardedMessages() {
- return _discardedMessages;
- }
-
- @Override
- public long getCompletedMessages() {
- return _completedMessages;
- }
-
- @Override
- public long getFailedMessages() {
- return _failedMessages;
- }
-
- @Override
- public long getPendingMessages() {
- return _pendingMessages;
+ incrementSimpleDynamicMetric(_pendingMessages, -1 * count);
}
@Override
@@ -101,4 +94,20 @@ public class ParticipantMessageMonitor implements
ParticipantMessageMonitorMBean
return PARTICIPANT_STATUS_KEY;
}
+ /**
+ * This method registers the dynamic metrics.
+ * @return
+ * @throws JMException
+ */
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_receivedMessages);
+ attributeList.add(_discardedMessages);
+ attributeList.add(_completedMessages);
+ attributeList.add(_failedMessages);
+ attributeList.add(_pendingMessages);
+ doRegister(attributeList, _initObjectName);
+ return this;
+ }
}
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
index d4a899f..f806d1b 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
@@ -8,7 +8,6 @@ package org.apache.helix.monitoring.mbeans;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
@@ -21,7 +20,7 @@ package org.apache.helix.monitoring.mbeans;
import org.apache.helix.monitoring.SensorNameProvider;
-
+@Deprecated
public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
public long getReceivedMessages();
public long getDiscardedMessages();
diff --git
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index 6e5b346..5b91cff 100644
---
a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++
b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -49,7 +49,8 @@ public class ParticipantStatusMonitor {
try {
_beanServer = ManagementFactory.getPlatformMBeanServer();
if (isParticipant) {
- _messageMonitor = new ParticipantMessageMonitor(instanceName);
+ _messageMonitor =
+ new ParticipantMessageMonitor(instanceName,
getObjectName(_messageMonitor.getParticipantBeanName()));
_messageLatencyMonitor =
new
MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(),
instanceName);
_messageLatencyMonitor.register();