This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new ec195c1db [feature] support kafka admin monitor (#2733)
ec195c1db is described below
commit ec195c1dbff8b94bf4f89f6e65a8a45d07667c4b
Author: Jast <[email protected]>
AuthorDate: Sun Sep 22 23:16:04 2024 +0800
[feature] support kafka admin monitor (#2733)
Co-authored-by: shown <[email protected]>
Co-authored-by: tomsun28 <[email protected]>
---
.../collector/dispatch/DispatchConstants.java | 5 +
collector/collector-kafka/pom.xml | 50 +++++
.../collector/collect/kafka/KafkaCollectImpl.java | 210 +++++++++++++++++++++
.../collector/collect/kafka/KafkaConnect.java | 64 +++++++
.../collector/collect/kafka/SupportedCommand.java | 63 +++++++
.../collector/collect/kafka/KafkaCollectTest.java | 93 +++++++++
collector/collector/pom.xml | 9 +
...che.hertzbeat.collector.collect.AbstractCollect | 1 +
collector/pom.xml | 6 +
.../hertzbeat/common/entity/job/Metrics.java | 7 +
.../common/entity/job/protocol/KafkaProtocol.java | 53 ++++++
home/docs/help/kafka_client.md | 47 +++++
.../current/help/kafka_client.md | 47 +++++
.../src/main/resources/define/app-kafka_client.yml | 168 +++++++++++++++++
14 files changed, 823 insertions(+)
diff --git
a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
index 8be5eab9b..c94016f0e 100644
---
a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
+++
b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
@@ -199,4 +199,9 @@ public interface DispatchConstants {
String PARSE_PROM_QL = "PromQL";
String PARSE_PROM_QL_VECTOR = "vector";
String PARSE_PROM_QL_MATRIX = "matrix";
+
+ /**
+ * protocol kafka
+ */
+ String PROTOCOL_KAFKA = "kclient";
}
diff --git a/collector/collector-kafka/pom.xml
b/collector/collector-kafka/pom.xml
new file mode 100644
index 000000000..9acd5c98d
--- /dev/null
+++ b/collector/collector-kafka/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat-collector</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>hertzbeat-collector-kafka</artifactId>
+ <name>${project.artifactId}</name>
+
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat-collector-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- kafka -->
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
new file mode 100644
index 000000000..ac48029bd
--- /dev/null
+++
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.collect.AbstractCollect;
+import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.springframework.util.Assert;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Slf4j
+public class KafkaCollectImpl extends AbstractCollect {
+
+ @Override
+ public void preCheck(Metrics metrics) throws IllegalArgumentException {
+ KafkaProtocol kafkaProtocol = metrics.getKclient();
+ // Ensure that metrics and kafkaProtocol are not null
+ Assert.isTrue(metrics != null && kafkaProtocol != null, "Kafka collect
must have kafkaProtocol params");
+ // Ensure that host and port are not empty
+ Assert.hasText(kafkaProtocol.getHost(), "Kafka Protocol host is
required.");
+ Assert.hasText(kafkaProtocol.getPort(), "Kafka Protocol port is
required.");
+ }
+
+ @Override
+ public void collect(CollectRep.MetricsData.Builder builder, long
monitorId, String app, Metrics metrics) {
+ try {
+ KafkaProtocol kafkaProtocol = metrics.getKclient();
+ String command = kafkaProtocol.getCommand();
+ boolean isKafkaCommand = SupportedCommand.isKafkaCommand(command);
+ if (!isKafkaCommand) {
+ log.error("Unsupported command: {}", command);
+ return;
+ }
+
+ // Create AdminClient with the provided host and port
+ AdminClient adminClient =
KafkaConnect.getAdminClient(kafkaProtocol.getHost() + ":" +
kafkaProtocol.getPort());
+
+ // Execute the appropriate collection method based on the command
+ switch (SupportedCommand.fromCommand(command)) {
+ case TOPIC_DESCRIBE:
+ collectTopicDescribe(builder, adminClient);
+ break;
+ case TOPIC_LIST:
+ collectTopicList(builder, adminClient);
+ break;
+ case TOPIC_OFFSET:
+ collectTopicOffset(builder, adminClient);
+ break;
+ default:
+ log.error("Unsupported command: {}", command);
+ break;
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ log.error("Kafka collect error", e);
+ }
+ }
+
+ /**
+ * Collect the earliest and latest offsets for each topic
+ *
+ * @param builder The MetricsData builder
+ * @param adminClient The AdminClient
+ * @throws InterruptedException If the thread is interrupted
+ * @throws ExecutionException If an error occurs during execution
+ */
+ private void collectTopicOffset(CollectRep.MetricsData.Builder builder,
AdminClient adminClient) throws InterruptedException, ExecutionException {
+ ListTopicsResult listTopicsResult = adminClient.listTopics(new
ListTopicsOptions().listInternal(true));
+ Set<String> names = listTopicsResult.names().get();
+ names.forEach(name -> {
+ try {
+ Map<String, TopicDescription> map =
adminClient.describeTopics(Collections.singleton(name)).all().get(3L,
TimeUnit.SECONDS);
+ map.forEach((key, value) -> value.partitions().forEach(info ->
extractedOffset(builder, adminClient, name, value, info)));
+ } catch (TimeoutException | InterruptedException |
ExecutionException e) {
+ log.warn("Topic {} get offset fail", name);
+ }
+ });
+ }
+
+ private void extractedOffset(CollectRep.MetricsData.Builder builder,
AdminClient adminClient, String name, TopicDescription value,
TopicPartitionInfo info) {
+ try {
+ TopicPartition topicPartition = new TopicPartition(value.name(),
info.partition());
+ long earliestOffset = getEarliestOffset(adminClient,
topicPartition);
+ long latestOffset = getLatestOffset(adminClient, topicPartition);
+ CollectRep.ValueRow.Builder valueRowBuilder =
CollectRep.ValueRow.newBuilder();
+ valueRowBuilder.addColumns(value.name());
+ valueRowBuilder.addColumns(String.valueOf(info.partition()));
+ valueRowBuilder.addColumns(String.valueOf(earliestOffset));
+ valueRowBuilder.addColumns(String.valueOf(latestOffset));
+ builder.addValues(valueRowBuilder.build());
+ } catch (TimeoutException | InterruptedException | ExecutionException
e) {
+ log.warn("Topic {} get offset fail", name);
+ }
+ }
+
+ /**
+ * Get the earliest offset for a given topic partition
+ *
+ * @param adminClient The AdminClient
+ * @param topicPartition The TopicPartition
+ * @return The earliest offset
+ */
+ private long getEarliestOffset(AdminClient adminClient, TopicPartition
topicPartition)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return adminClient
+ .listOffsets(Collections.singletonMap(topicPartition,
OffsetSpec.earliest()))
+ .all()
+ .get(3L, TimeUnit.SECONDS)
+ .get(topicPartition)
+ .offset();
+ }
+
+ /**
+ * Get the latest offset for a given topic partition
+ *
+ * @param adminClient The AdminClient
+ * @param topicPartition The TopicPartition
+ * @return The latest offset
+ */
+ private long getLatestOffset(AdminClient adminClient, TopicPartition
topicPartition)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return adminClient
+ .listOffsets(Collections.singletonMap(topicPartition,
OffsetSpec.latest()))
+ .all()
+ .get(3L, TimeUnit.SECONDS)
+ .get(topicPartition)
+ .offset();
+ }
+
+ /**
+ * Collect the list of topics
+ *
+ * @param builder The MetricsData builder
+ * @param adminClient The AdminClient
+ */
+ private static void collectTopicList(CollectRep.MetricsData.Builder
builder, AdminClient adminClient) throws InterruptedException,
ExecutionException {
+ ListTopicsOptions options = new ListTopicsOptions().listInternal(true);
+ Set<String> names = adminClient.listTopics(options).names().get();
+ names.forEach(name -> {
+ CollectRep.ValueRow valueRow =
CollectRep.ValueRow.newBuilder().addColumns(name).build();
+ builder.addValues(valueRow);
+ });
+ }
+
+ /**
+ * Collect the description of each topic
+ *
+ * @param builder The MetricsData builder
+ * @param adminClient The AdminClient
+ */
+ private static void collectTopicDescribe(CollectRep.MetricsData.Builder
builder, AdminClient adminClient) throws InterruptedException,
ExecutionException {
+ ListTopicsOptions options = new ListTopicsOptions();
+ options.listInternal(true);
+ ListTopicsResult listTopicsResult = adminClient.listTopics(options);
+ Set<String> names = listTopicsResult.names().get();
+ DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(names);
+ Map<String, TopicDescription> map = describeTopicsResult.all().get();
+ map.forEach((key, value) -> {
+ List<TopicPartitionInfo> listp = value.partitions();
+ listp.forEach(info -> {
+ CollectRep.ValueRow.Builder valueRowBuilder =
CollectRep.ValueRow.newBuilder();
+ valueRowBuilder.addColumns(value.name());
+
valueRowBuilder.addColumns(String.valueOf(value.partitions().size()));
+ valueRowBuilder.addColumns(String.valueOf(info.partition()));
+ valueRowBuilder.addColumns(info.leader().host());
+
valueRowBuilder.addColumns(String.valueOf(info.leader().port()));
+
valueRowBuilder.addColumns(String.valueOf(info.replicas().size()));
+ valueRowBuilder.addColumns(String.valueOf(info.replicas()));
+ builder.addValues(valueRowBuilder.build());
+ });
+ });
+ }
+
+ @Override
+ public String supportProtocol() {
+ return DispatchConstants.PROTOCOL_KAFKA;
+ }
+}
\ No newline at end of file
diff --git
a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
new file mode 100644
index 000000000..2d0bbb11b
--- /dev/null
+++
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+
+import java.util.Properties;
+
+/**
+ * Kafka connection
+ */
+public class KafkaConnect extends AbstractConnection<AdminClient> {
+
+
+ private static AdminClient adminClient;
+
+ public KafkaConnect(String brokerList) {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+ properties.put(AdminClientConfig.RETRIES_CONFIG, 3);
+ properties.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 500);
+ adminClient = KafkaAdminClient.create(properties);
+ }
+
+ @Override
+ public AdminClient getConnection() {
+ return adminClient;
+ }
+
+ @Override
+ public void closeConnection() throws Exception {
+ if (this.adminClient != null) {
+ this.adminClient.close();
+ }
+ }
+
+ public static synchronized AdminClient getAdminClient(String brokerList) {
+ if (adminClient == null) {
+ Properties properties = new Properties();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
brokerList);
+ adminClient = KafkaAdminClient.create(properties);
+ }
+ return adminClient;
+ }
+
+}
diff --git
a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
new file mode 100644
index 000000000..911a3529d
--- /dev/null
+++
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * SupportedCommand
+ */
+public enum SupportedCommand {
+
+ TOPIC_DESCRIBE("topic-describe"),
+ TOPIC_LIST("topic-list"),
+ TOPIC_OFFSET("topic-offset");
+
+ private static Set<String> SUPPORTED_COMMAND = new HashSet<>();
+
+ static {
+ // O(1) complexity, using static to load all system placeholders
+ for (SupportedCommand placeholder : SupportedCommand.values()) {
+ SUPPORTED_COMMAND.add(placeholder.getCommand());
+ }
+ }
+
+ private final String key;
+
+ SupportedCommand(String command) {
+ this.key = command;
+ }
+
+ public String getCommand() {
+ return key;
+ }
+
+ public static boolean isKafkaCommand(String str) {
+ return SUPPORTED_COMMAND.contains(str);
+ }
+
+ public static SupportedCommand fromCommand(String command) {
+ for (SupportedCommand supportedCommand : SupportedCommand.values()) {
+ if (supportedCommand.getCommand().equals(command)) {
+ return supportedCommand;
+ }
+ }
+ throw new IllegalArgumentException("No enum constant for command: " +
command);
+ }
+}
diff --git
a/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
new file mode 100644
index 000000000..04c7676b1
--- /dev/null
+++
b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test case for {@link KafkaCollectImpl}
+ */
+public class KafkaCollectTest {
+ private KafkaCollectImpl collect;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ collect = new KafkaCollectImpl();
+ }
+
+ @Test
+ void preCheck() {
+ // metrics is null
+ assertThrows(NullPointerException.class, () -> {
+ collect.preCheck(null);
+ });
+
+ // kafka is null
+ assertThrows(IllegalArgumentException.class, () -> {
+ collect.preCheck(Metrics.builder().build());
+ });
+
+ // kafka srv host is null
+ assertThrows(IllegalArgumentException.class, () -> {
+ KafkaProtocol kafka = new KafkaProtocol();
+ collect.preCheck(Metrics.builder().kclient(kafka).build());
+ });
+
+ // kafka port is null
+ assertThrows(IllegalArgumentException.class, () -> {
+ KafkaProtocol kafka =
KafkaProtocol.builder().host("127.0.0.1").build();
+ collect.preCheck(Metrics.builder().kclient(kafka).build());
+ });
+
+ // no exception throw
+ assertDoesNotThrow(() -> {
+ KafkaProtocol kafka =
KafkaProtocol.builder().host("127.0.0.1").port("9092").build();
+ collect.preCheck(Metrics.builder().kclient(kafka).build());
+ });
+ }
+
+ @Test
+ void collect() {
+ // metrics is null
+ assertThrows(NullPointerException.class, () -> {
+ CollectRep.MetricsData.Builder builder =
CollectRep.MetricsData.newBuilder();
+ collect.collect(builder, 1L, "app", null);
+ });
+
+ assertDoesNotThrow(() -> {
+ CollectRep.MetricsData.Builder builder =
CollectRep.MetricsData.newBuilder();
+ KafkaProtocol kafka =
KafkaProtocol.builder().host("127.0.0.1").port("9092").build();
+ Metrics metrics = Metrics.builder().kclient(kafka).build();
+ collect.collect(builder, 1L, "app", metrics);
+ });
+ }
+
+ @Test
+ void supportProtocol() {
+ assertEquals(DispatchConstants.PROTOCOL_KAFKA,
collect.supportProtocol());
+ }
+}
diff --git a/collector/collector/pom.xml b/collector/collector/pom.xml
index e4e2801b1..172d3af83 100644
--- a/collector/collector/pom.xml
+++ b/collector/collector/pom.xml
@@ -42,6 +42,13 @@
<version>${hertzbeat.version}</version>
</dependency>
+ <!-- collector-kafka -->
+ <dependency>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat-collector-kafka</artifactId>
+ <version>${hertzbeat.version}</version>
+ </dependency>
+
<!-- collector-mongodb -->
<dependency>
<groupId>org.apache.hertzbeat</groupId>
@@ -63,6 +70,8 @@
<version>${hertzbeat.version}</version>
</dependency>
+
+
<!-- spring -->
<dependency>
<groupId>org.springframework.boot</groupId>
diff --git
a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
index 69b4076fe..1ece33cf3 100644
---
a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
+++
b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
@@ -26,3 +26,4 @@
org.apache.hertzbeat.collector.collect.nebulagraph.NgqlCollectImpl
org.apache.hertzbeat.collector.collect.imap.ImapCollectImpl
org.apache.hertzbeat.collector.collect.script.ScriptCollectImpl
org.apache.hertzbeat.collector.collect.mqtt.MqttCollectImpl
+org.apache.hertzbeat.collector.collect.kafka.KafkaCollectImpl
diff --git a/collector/pom.xml b/collector/pom.xml
index d1dccb9d6..039b37b67 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -40,6 +40,7 @@
<module>collector-mongodb</module>
<module>collector-nebulagraph</module>
<module>collector-rocketmq</module>
+ <module>collector-kafka</module>
</modules>
<dependencyManagement>
@@ -59,6 +60,11 @@
<artifactId>hertzbeat-collector-mongodb</artifactId>
<version>${hertzbeat.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat-collector-kafka</artifactId>
+ <version>${hertzbeat.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.hertzbeat</groupId>
<artifactId>hertzbeat-collector-nebulagraph</artifactId>
diff --git
a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
index 37b96db33..086c366a4 100644
--- a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
+++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -36,6 +37,7 @@ import
org.apache.hertzbeat.common.entity.job.protocol.IcmpProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.ImapProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.JmxProtocol;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.MemcachedProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.MongodbProtocol;
import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol;
@@ -231,6 +233,11 @@ public class Metrics {
*/
private MqttProtocol mqtt;
+ /**
+ * Monitoring configuration information using the public kafka protocol
+ */
+ private KafkaProtocol kclient;
+
/**
* collector use - Temporarily store subTask metrics response data
*/
diff --git
a/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
new file mode 100644
index 000000000..9603e1ffd
--- /dev/null
+++
b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.entity.job.protocol;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Kafka protocol
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class KafkaProtocol {
+
+ /**
+ * IP ADDRESS OR DOMAIN NAME OF THE PEER HOST
+ */
+ private String host;
+
+ /**
+ * Port number
+ */
+ private String port;
+
+ /**
+ * TIME OUT PERIOD
+ */
+ private String timeout;
+
+ /**
+ * COMMAND
+ */
+ private String command;
+}
diff --git a/home/docs/help/kafka_client.md b/home/docs/help/kafka_client.md
new file mode 100644
index 000000000..baedd1c4a
--- /dev/null
+++ b/home/docs/help/kafka_client.md
@@ -0,0 +1,47 @@
+---
+id: kafka_client
+title: Monitoring: Kafka Monitoring (Client-based)
+sidebar_label: Kafka Monitoring (Client-based)
+keywords: [open-source monitoring system, open-source message middleware
monitoring, Kafka monitoring]
+---
+
+> Collect and monitor general metrics for Kafka.
+
+### Configuration Parameters
+
+| Parameter Name | Help Description
|
+|------------------|---------------------------------------------------------------|
+| Monitoring Host | The monitored peer's IPv4, IPv6, or domain name. Note: ⚠️
Do not include protocol headers (e.g., https://, http://). |
+| Monitoring Port | The monitored service port.
|
+| Task Name | The identifier for this monitoring task, which must be
unique. |
+| Collection Interval | The interval for periodic data collection, in seconds.
The minimum allowable interval is 30 seconds. |
+| Description/Remarks | Additional information to describe and identify this
monitoring task. Users can add remarks here. |
+
+### Collected Metrics
+
+#### Metric Set: topic_list
+
+| Metric Name | Unit | Help Description |
+|--------------|------|------------------|
+| TopicName | None | Topic Name |
+
+#### Metric Set: topic_detail
+
+| Metric Name | Unit | Help Description |
+|----------------------|------|------------------|
+| TopicName | None | Topic Name |
+| PartitionNum | None | Number of Partitions |
+| PartitionLeader | None | Partition Leader |
+| BrokerHost | None | Broker Host |
+| BrokerPort | None | Broker Port |
+| ReplicationFactorSize| None | Replication Factor Size |
+| ReplicationFactor | None | Replication Factor |
+
+#### Metric Set: topic_offset
+
+| Metric Name | Unit | Help Description |
+|---------------|------|------------------|
+| TopicName | None | Topic Name |
+| PartitionNum | None | Number of Partitions |
+| earliest | None | Earliest Offset |
+| latest | None | Latest Offset |
diff --git
a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
new file mode 100644
index 000000000..1ae63e03b
--- /dev/null
+++
b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
@@ -0,0 +1,47 @@
+---
+id: kafka_client
+title: 监控:Kafka监控(基于客户端)
+sidebar_label: Kafka监控(基于客户端)
+keywords: [开源监控系统, 开源消息中间件监控, Kafka监控]
+---
+
+> 对Kafka的通用指标进行采集监控
+
+### 配置参数
+
+| 参数名称 | 参数帮助描述 |
+|--------|------------------------------------------------------|
+| 监控Host | 被监控的对端IPV4,IPV6或域名。注意⚠️不带协议头(eg: https://, http://)。 |
+| 监控Port | 被监控的服务端口。 |
+| 任务名称 | 标识此监控的名称,名称需要保证唯一性。 |
+| 采集间隔 | 监控周期性采集数据间隔时间,单位秒,可设置的最小间隔为30秒 |
+| 描述备注 | 更多标识和描述此监控的备注信息,用户可以在这里备注信息 |
+
+### 采集指标
+
+#### 指标集合:topic_list
+
+| 指标名称 | 指标单位 | 指标帮助描述 |
+|-------------|------|---------|
+| TopicName | 无 | 主题名称 |
+
+#### 指标集合:topic_detail
+
+| 指标名称 | 指标单位 | 指标帮助描述 |
+|-----------|------|--------|
+| TopicName | 无 | 主题名称 |
+| PartitionNum | 无 | 分区数量 |
+| PartitionLeader | 无 | 分区领导者 |
+| BrokerHost | 无 | Broker主机 |
+| BrokerPort | 无 | Broker端口 |
+| ReplicationFactorSize | 无 | 复制因子大小 |
+| ReplicationFactor | 无 | 复制因子 |
+
+#### 指标集合:topic_offset
+
+| 指标名称 | 指标单位 | 指标帮助描述 |
+|-------|---|---------|
+| TopicName | 无 | 主题名称 |
+| PartitionNum | 无 | 分区数量 |
+| earliest | 无 | 最早偏移量 |
+| latest | 无 | 最新偏移量 |
diff --git a/manager/src/main/resources/define/app-kafka_client.yml
b/manager/src/main/resources/define/app-kafka_client.yml
new file mode 100644
index 000000000..5d063aec9
--- /dev/null
+++ b/manager/src/main/resources/define/app-kafka_client.yml
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The monitoring type category:service-application service monitoring
db-database monitoring custom-custom monitoring os-operating system monitoring
+category: mid
+# The monitoring type eg: linux windows tomcat mysql aws...
+app: kafka_client
+# The monitoring i18n name
+name:
+ zh-CN: Kafka消息系统(客户端)
+ en-US: Kafka Message(Client)
+# The description and help of this monitoring type
+help:
+ zh-CN: HertzBeat 使用 <a
href="https://hertzbeat.apache.org/docs/advanced/extend-jmx">Kafka Admin
Client</a> 对 Kafka 的通用指标进行采集监控。</span>
+ en-US: HertzBeat uses <a
href='https://hertzbeat.apache.org/docs/advanced/extend-jmx'>Kafka Admin
Client</a> to monitoring kafka general metrics. </span>
+ zh-TW: HertzBeat 使用 <a
href="https://hertzbeat.apache.org/docs/advanced/extend-jmx">Kafka Admin
Client</a> 對 Kafka 的通用指標進行采集監控。</span>
+helpLink:
+ zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/kafka_client
+ en-US: https://hertzbeat.apache.org/docs/help/kafka_client
+# Input params define for monitoring(render web ui by the definition)
+params:
+ # field-param field key
+ - field: host
+ # name-param field display i18n name
+ name:
+ zh-CN: 目标Host
+ en-US: Target Host
+ # type-param field type(most mapping the html input type)
+ type: host
+ # required-true or false
+ required: true
+ - field: port
+ name:
+ zh-CN: 端口
+ en-US: Port
+ type: number
+ # when type is number, range is required
+ range: '[0,65535]'
+ required: true
+ defaultValue: 9092
+
+# collect metrics config list
+metrics:
+ # metrics - server_info
+ - name: topic_list
+ i18n:
+ zh-CN: 主题列表
+ en-US: Topic List
+ # metrics scheduling priority(0->127)->(high->low), metrics with the same
priority will be scheduled in parallel
+ # priority 0's metrics is availability metrics, it will be scheduled
first, only availability metrics collect success will the scheduling continue
+ priority: 0
+ # collect metrics content
+ fields:
+ # field-metric name, type-metric type(0-number,1-string), unit-metric
unit('%','ms','MB'), label-whether it is a metrics label field
+ - field: TopicName
+ type: 1
+ i18n:
+ zh-CN: 主题名称
+ en-US: Topic Name
+ # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp,
sdk
+ protocol: kclient
+ # the config content when protocol is jmx
+ kclient:
+ host: ^_^host^_^
+ port: ^_^port^_^
+ command: topic-list
+ - name: topic_detail
+ i18n:
+ zh-CN: 主题详细信息
+ en-US: Topic Detail Info
+ # metrics scheduling priority(0->127)->(high->low), metrics with the same
priority will be scheduled in parallel
+ # priority 0's metrics is availability metrics, it will be scheduled
first, only availability metrics collect success will the scheduling continue
+ priority: 0
+ # collect metrics content
+ fields:
+ # field-metric name, type-metric type(0-number,1-string), unit-metric
unit('%','ms','MB'), label-whether it is a metrics label field
+ - field: TopicName
+ type: 1
+ i18n:
+ zh-CN: 主题名称
+ en-US: Topic Name
+ - field: PartitionNum
+ type: 1
+ i18n:
+ zh-CN: 分区数量
+ en-US: Partition Num
+ - field: PartitionLeader
+ type: 1
+ i18n:
+ zh-CN: 分区领导者
+ en-US: Partition Leader
+ - field: BrokerHost
+ type: 1
+ i18n:
+ zh-CN: Broker主机
+ en-US: Broker Host
+ - field: BrokerPort
+ type: 1
+ i18n:
+ zh-CN: Broker端口
+ en-US: Broker Port
+ - field: ReplicationFactorSize
+ type: 1
+ i18n:
+ zh-CN: 复制因子大小
+ en-US: Replication Factor Size
+ - field: ReplicationFactor
+ type: 1
+ i18n:
+ zh-CN: 复制因子
+ en-US: Replication Factor
+ # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp,
sdk
+ protocol: kclient
+ # the config content when protocol is jmx
+ kclient:
+ host: ^_^host^_^
+ port: ^_^port^_^
+ command: topic-describe
+ - name: topic_offset
+ i18n:
+ zh-CN: 主题偏移量
+ en-US: Topic Offset
+ # metrics scheduling priority(0->127)->(high->low), metrics with the same
priority will be scheduled in parallel
+ # priority 0's metrics is availability metrics, it will be scheduled
first, only availability metrics collect success will the scheduling continue
+ priority: 0
+ # collect metrics content
+ fields:
+ # field-metric name, type-metric type(0-number,1-string), unit-metric
unit('%','ms','MB'), label-whether it is a metrics label field
+ - field: TopicName
+ type: 1
+ i18n:
+ zh-CN: 主题名称
+ en-US: Topic Name
+ - field: PartitionNum
+ type: 1
+ i18n:
+ zh-CN: 分区数量
+ en-US: Partition Num
+ - field: earliest
+ type: 0
+ i18n:
+ zh-CN: 最早偏移量
+ en-US: Earliest Offset
+ - field: latest
+ type: 0
+ i18n:
+ zh-CN: 最新偏移量
+ en-US: Latest Offset
+ # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp,
sdk
+ protocol: kclient
+ # the config content when protocol is jmx
+ kclient:
+ host: ^_^host^_^
+ port: ^_^port^_^
+ command: topic-offset
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]