This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a41beb0f49 MINOR: Check the existence of AppInfo for the given ID
before creating a new mbean of the same name (#14287)
2a41beb0f49 is described below
commit 2a41beb0f49f947cfa7dfd99101c8b1ba89842cb
Author: Eaugene Thomas <[email protected]>
AuthorDate: Thu Sep 14 12:23:57 2023 +0530
MINOR: Check the existence of AppInfo for the given ID before creating a
new mbean of the same name (#14287)
When using kafka consumer in apache pinot , we did see couple of WARN as we
are trying to create kafka consumer class with the same name . We currently
have to use a added suffix to create a new mBean as each new kafka consumer in
same process creates a mBean . Adding support here to skip creation of mBean if
its already existing
Reviewers: Satish Duggana <[email protected]>, Luke Chen
<[email protected]>
---
.../java/org/apache/kafka/common/utils/AppInfoParser.java | 7 ++++++-
.../org/apache/kafka/common/utils/AppInfoParserTest.java | 14 ++++++++++++++
2 files changed, 20 insertions(+), 1 deletion(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 19f98d1b652..46ffdd67438 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -60,8 +60,13 @@ public class AppInfoParser {
public static synchronized void registerAppInfo(String prefix, String id,
Metrics metrics, long nowMs) {
try {
ObjectName name = new ObjectName(prefix + ":type=app-info,id=" +
Sanitizer.jmxSanitize(id));
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+ if (server.isRegistered(name)) {
+ log.info("The mbean of App info: [{}], id: [{}] already
exists, so skipping a new mbean creation.", prefix, id);
+ return;
+ }
AppInfo mBean = new AppInfo(nowMs);
- ManagementFactory.getPlatformMBeanServer().registerMBean(mBean,
name);
+ server.registerMBean(mBean, name);
registerMetrics(metrics, mBean); // prefix will be added later by
JmxReporter
} catch (JMException e) {
diff --git
a/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java
b/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java
index c33ef398029..0735aa1dce7 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/AppInfoParserTest.java
@@ -57,6 +57,7 @@ public class AppInfoParserTest {
@Test
public void testRegisterAppInfoRegistersMetrics() throws JMException {
registerAppInfo();
+ registerAppInfoMultipleTimes();
}
@Test
@@ -82,6 +83,19 @@ public class AppInfoParserTest {
assertEquals(EXPECTED_START_MS,
metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue());
}
+ private void registerAppInfoMultipleTimes() throws JMException {
+ assertEquals(EXPECTED_COMMIT_VERSION, AppInfoParser.getCommitId());
+ assertEquals(EXPECTED_VERSION, AppInfoParser.getVersion());
+
+ AppInfoParser.registerAppInfo(METRICS_PREFIX, METRICS_ID, metrics,
EXPECTED_START_MS);
+ AppInfoParser.registerAppInfo(METRICS_PREFIX, METRICS_ID, metrics,
EXPECTED_START_MS); // We register it again
+
+ assertTrue(mBeanServer.isRegistered(expectedAppObjectName()));
+ assertEquals(EXPECTED_COMMIT_VERSION,
metrics.metric(metrics.metricName("commit-id", "app-info")).metricValue());
+ assertEquals(EXPECTED_VERSION,
metrics.metric(metrics.metricName("version", "app-info")).metricValue());
+ assertEquals(EXPECTED_START_MS,
metrics.metric(metrics.metricName("start-time-ms", "app-info")).metricValue());
+ }
+
private ObjectName expectedAppObjectName() throws
MalformedObjectNameException {
return new ObjectName(METRICS_PREFIX + ":type=app-info,id=" +
METRICS_ID);
}