This is an automated email from the ASF dual-hosted git repository.

gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new ec195c1db [feature] support kafka admin monitor (#2733)
ec195c1db is described below

commit ec195c1dbff8b94bf4f89f6e65a8a45d07667c4b
Author: Jast <[email protected]>
AuthorDate: Sun Sep 22 23:16:04 2024 +0800

    [feature] support kafka admin monitor (#2733)
    
    Co-authored-by: shown <[email protected]>
    Co-authored-by: tomsun28 <[email protected]>
---
 .../collector/dispatch/DispatchConstants.java      |   5 +
 collector/collector-kafka/pom.xml                  |  50 +++++
 .../collector/collect/kafka/KafkaCollectImpl.java  | 210 +++++++++++++++++++++
 .../collector/collect/kafka/KafkaConnect.java      |  64 +++++++
 .../collector/collect/kafka/SupportedCommand.java  |  63 +++++++
 .../collector/collect/kafka/KafkaCollectTest.java  |  93 +++++++++
 collector/collector/pom.xml                        |   9 +
 ...che.hertzbeat.collector.collect.AbstractCollect |   1 +
 collector/pom.xml                                  |   6 +
 .../hertzbeat/common/entity/job/Metrics.java       |   7 +
 .../common/entity/job/protocol/KafkaProtocol.java  |  53 ++++++
 home/docs/help/kafka_client.md                     |  47 +++++
 .../current/help/kafka_client.md                   |  47 +++++
 .../src/main/resources/define/app-kafka_client.yml | 168 +++++++++++++++++
 14 files changed, 823 insertions(+)

diff --git 
a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
 
b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
index 8be5eab9b..c94016f0e 100644
--- 
a/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
+++ 
b/collector/collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/DispatchConstants.java
@@ -199,4 +199,9 @@ public interface DispatchConstants {
     String PARSE_PROM_QL = "PromQL";
     String PARSE_PROM_QL_VECTOR = "vector";
     String PARSE_PROM_QL_MATRIX = "matrix";
+
+    /**
+     * protocol kafka
+     */
+    String PROTOCOL_KAFKA = "kclient";
 }
diff --git a/collector/collector-kafka/pom.xml 
b/collector/collector-kafka/pom.xml
new file mode 100644
index 000000000..9acd5c98d
--- /dev/null
+++ b/collector/collector-kafka/pom.xml
@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.hertzbeat</groupId>
+        <artifactId>hertzbeat-collector</artifactId>
+        <version>2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>hertzbeat-collector-kafka</artifactId>
+    <name>${project.artifactId}</name>
+
+    <properties>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hertzbeat</groupId>
+            <artifactId>hertzbeat-collector-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <!-- kafka -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
 
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
new file mode 100644
index 000000000..ac48029bd
--- /dev/null
+++ 
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectImpl.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.collect.AbstractCollect;
+import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.springframework.util.Assert;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Slf4j
+public class KafkaCollectImpl extends AbstractCollect {
+
+    @Override
+    public void preCheck(Metrics metrics) throws IllegalArgumentException {
+        KafkaProtocol kafkaProtocol = metrics.getKclient();
+        // Ensure that metrics and kafkaProtocol are not null
+        Assert.isTrue(metrics != null && kafkaProtocol != null, "Kafka collect 
must have kafkaProtocol params");
+        // Ensure that host and port are not empty
+        Assert.hasText(kafkaProtocol.getHost(), "Kafka Protocol host is 
required.");
+        Assert.hasText(kafkaProtocol.getPort(), "Kafka Protocol port is 
required.");
+    }
+
+    @Override
+    public void collect(CollectRep.MetricsData.Builder builder, long 
monitorId, String app, Metrics metrics) {
+        try {
+            KafkaProtocol kafkaProtocol = metrics.getKclient();
+            String command = kafkaProtocol.getCommand();
+            boolean isKafkaCommand = SupportedCommand.isKafkaCommand(command);
+            if (!isKafkaCommand) {
+                log.error("Unsupported command: {}", command);
+                return;
+            }
+
+            // Create AdminClient with the provided host and port
+            AdminClient adminClient = 
KafkaConnect.getAdminClient(kafkaProtocol.getHost() + ":" + 
kafkaProtocol.getPort());
+
+            // Execute the appropriate collection method based on the command
+            switch (SupportedCommand.fromCommand(command)) {
+                case TOPIC_DESCRIBE:
+                    collectTopicDescribe(builder, adminClient);
+                    break;
+                case TOPIC_LIST:
+                    collectTopicList(builder, adminClient);
+                    break;
+                case TOPIC_OFFSET:
+                    collectTopicOffset(builder, adminClient);
+                    break;
+                default:
+                    log.error("Unsupported command: {}", command);
+                    break;
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("Kafka collect error", e);
+        }
+    }
+
+    /**
+     * Collect the earliest and latest offsets for each topic
+     *
+     * @param builder     The MetricsData builder
+     * @param adminClient The AdminClient
+     * @throws InterruptedException If the thread is interrupted
+     * @throws ExecutionException   If an error occurs during execution
+     */
+    private void collectTopicOffset(CollectRep.MetricsData.Builder builder, 
AdminClient adminClient) throws InterruptedException, ExecutionException {
+        ListTopicsResult listTopicsResult = adminClient.listTopics(new 
ListTopicsOptions().listInternal(true));
+        Set<String> names = listTopicsResult.names().get();
+        names.forEach(name -> {
+            try {
+                Map<String, TopicDescription> map = 
adminClient.describeTopics(Collections.singleton(name)).all().get(3L, 
TimeUnit.SECONDS);
+                map.forEach((key, value) -> value.partitions().forEach(info -> 
extractedOffset(builder, adminClient, name, value, info)));
+            } catch (TimeoutException | InterruptedException | 
ExecutionException e) {
+                log.warn("Topic {} get offset fail", name);
+            }
+        });
+    }
+
+    private void extractedOffset(CollectRep.MetricsData.Builder builder, 
AdminClient adminClient, String name, TopicDescription value, 
TopicPartitionInfo info) {
+        try {
+            TopicPartition topicPartition = new TopicPartition(value.name(), 
info.partition());
+            long earliestOffset = getEarliestOffset(adminClient, 
topicPartition);
+            long latestOffset = getLatestOffset(adminClient, topicPartition);
+            CollectRep.ValueRow.Builder valueRowBuilder = 
CollectRep.ValueRow.newBuilder();
+            valueRowBuilder.addColumns(value.name());
+            valueRowBuilder.addColumns(String.valueOf(info.partition()));
+            valueRowBuilder.addColumns(String.valueOf(earliestOffset));
+            valueRowBuilder.addColumns(String.valueOf(latestOffset));
+            builder.addValues(valueRowBuilder.build());
+        } catch (TimeoutException | InterruptedException | ExecutionException 
e) {
+            log.warn("Topic {} get offset fail", name);
+        }
+    }
+
+    /**
+     * Get the earliest offset for a given topic partition
+     *
+     * @param adminClient    The AdminClient
+     * @param topicPartition The TopicPartition
+     * @return The earliest offset
+     */
+    private long getEarliestOffset(AdminClient adminClient, TopicPartition 
topicPartition)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return adminClient
+                .listOffsets(Collections.singletonMap(topicPartition, 
OffsetSpec.earliest()))
+                .all()
+                .get(3L, TimeUnit.SECONDS)
+                .get(topicPartition)
+                .offset();
+    }
+
+    /**
+     * Get the latest offset for a given topic partition
+     *
+     * @param adminClient    The AdminClient
+     * @param topicPartition The TopicPartition
+     * @return The latest offset
+     */
+    private long getLatestOffset(AdminClient adminClient, TopicPartition 
topicPartition)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return adminClient
+                .listOffsets(Collections.singletonMap(topicPartition, 
OffsetSpec.latest()))
+                .all()
+                .get(3L, TimeUnit.SECONDS)
+                .get(topicPartition)
+                .offset();
+    }
+
+    /**
+     * Collect the list of topics
+     *
+     * @param builder     The MetricsData builder
+     * @param adminClient The AdminClient
+     */
+    private static void collectTopicList(CollectRep.MetricsData.Builder 
builder, AdminClient adminClient) throws InterruptedException, 
ExecutionException {
+        ListTopicsOptions options = new ListTopicsOptions().listInternal(true);
+        Set<String> names = adminClient.listTopics(options).names().get();
+        names.forEach(name -> {
+            CollectRep.ValueRow valueRow = 
CollectRep.ValueRow.newBuilder().addColumns(name).build();
+            builder.addValues(valueRow);
+        });
+    }
+
+    /**
+     * Collect the description of each topic
+     *
+     * @param builder     The MetricsData builder
+     * @param adminClient The AdminClient
+     */
+    private static void collectTopicDescribe(CollectRep.MetricsData.Builder 
builder, AdminClient adminClient) throws InterruptedException, 
ExecutionException {
+        ListTopicsOptions options = new ListTopicsOptions();
+        options.listInternal(true);
+        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
+        Set<String> names = listTopicsResult.names().get();
+        DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(names);
+        Map<String, TopicDescription> map = describeTopicsResult.all().get();
+        map.forEach((key, value) -> {
+            List<TopicPartitionInfo> listp = value.partitions();
+            listp.forEach(info -> {
+                CollectRep.ValueRow.Builder valueRowBuilder = 
CollectRep.ValueRow.newBuilder();
+                valueRowBuilder.addColumns(value.name());
+                
valueRowBuilder.addColumns(String.valueOf(value.partitions().size()));
+                valueRowBuilder.addColumns(String.valueOf(info.partition()));
+                valueRowBuilder.addColumns(info.leader().host());
+                
valueRowBuilder.addColumns(String.valueOf(info.leader().port()));
+                
valueRowBuilder.addColumns(String.valueOf(info.replicas().size()));
+                valueRowBuilder.addColumns(String.valueOf(info.replicas()));
+                builder.addValues(valueRowBuilder.build());
+            });
+        });
+    }
+
+    @Override
+    public String supportProtocol() {
+        return DispatchConstants.PROTOCOL_KAFKA;
+    }
+}
\ No newline at end of file
diff --git 
a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
 
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
new file mode 100644
index 000000000..2d0bbb11b
--- /dev/null
+++ 
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/KafkaConnect.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import org.apache.hertzbeat.collector.collect.common.cache.AbstractConnection;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.KafkaAdminClient;
+
+import java.util.Properties;
+
+/**
+ * Kafka connection
+ */
+public class KafkaConnect extends AbstractConnection<AdminClient> {
+
+
+    private static AdminClient adminClient;
+
+    public KafkaConnect(String brokerList) {
+        Properties properties = new Properties();
+        properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
+        properties.put(AdminClientConfig.RETRIES_CONFIG, 3);
+        properties.put(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, 500);
+        adminClient = KafkaAdminClient.create(properties);
+    }
+
+    @Override
+    public AdminClient getConnection() {
+        return adminClient;
+    }
+
+    @Override
+    public void closeConnection() throws Exception {
+        if (this.adminClient != null) {
+            this.adminClient.close();
+        }
+    }
+
+    public static synchronized AdminClient getAdminClient(String brokerList) {
+        if (adminClient == null) {
+            Properties properties = new Properties();
+            properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
brokerList);
+            adminClient = KafkaAdminClient.create(properties);
+        }
+        return adminClient;
+    }
+
+}
diff --git 
a/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
 
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
new file mode 100644
index 000000000..911a3529d
--- /dev/null
+++ 
b/collector/collector-kafka/src/main/java/org/apache/hertzbeat/collector/collect/kafka/SupportedCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * SupportedCommand
+ */
+public enum SupportedCommand {
+
+    TOPIC_DESCRIBE("topic-describe"),
+    TOPIC_LIST("topic-list"),
+    TOPIC_OFFSET("topic-offset");
+
+    private static Set<String> SUPPORTED_COMMAND = new HashSet<>();
+
+    static {
+        // O(1) complexity, using static to load all system placeholders
+        for (SupportedCommand placeholder : SupportedCommand.values()) {
+            SUPPORTED_COMMAND.add(placeholder.getCommand());
+        }
+    }
+
+    private final String key;
+
+    SupportedCommand(String command) {
+        this.key = command;
+    }
+
+    public String getCommand() {
+        return key;
+    }
+
+    public static boolean isKafkaCommand(String str) {
+        return SUPPORTED_COMMAND.contains(str);
+    }
+
+    public static SupportedCommand fromCommand(String command) {
+        for (SupportedCommand supportedCommand : SupportedCommand.values()) {
+            if (supportedCommand.getCommand().equals(command)) {
+                return supportedCommand;
+            }
+        }
+        throw new IllegalArgumentException("No enum constant for command: " + 
command);
+    }
+}
diff --git 
a/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
 
b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
new file mode 100644
index 000000000..04c7676b1
--- /dev/null
+++ 
b/collector/collector-kafka/src/test/java/org/apache/hertzbeat/collector/collect/kafka/KafkaCollectTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.collector.collect.kafka;
+
+import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Test case for {@link KafkaCollectImpl}
+ */
+public class KafkaCollectTest {
+    private KafkaCollectImpl collect;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        collect = new KafkaCollectImpl();
+    }
+
+    @Test
+    void preCheck() {
+        // metrics is null
+        assertThrows(NullPointerException.class, () -> {
+            collect.preCheck(null);
+        });
+
+        // kafka is null
+        assertThrows(IllegalArgumentException.class, () -> {
+            collect.preCheck(Metrics.builder().build());
+        });
+
+        // kafka srv host is null
+        assertThrows(IllegalArgumentException.class, () -> {
+            KafkaProtocol kafka = new KafkaProtocol();
+            collect.preCheck(Metrics.builder().kclient(kafka).build());
+        });
+
+        // kafka port is null
+        assertThrows(IllegalArgumentException.class, () -> {
+            KafkaProtocol kafka = 
KafkaProtocol.builder().host("127.0.0.1").build();
+            collect.preCheck(Metrics.builder().kclient(kafka).build());
+        });
+
+        // no exception throw
+        assertDoesNotThrow(() -> {
+            KafkaProtocol kafka = 
KafkaProtocol.builder().host("127.0.0.1").port("9092").build();
+            collect.preCheck(Metrics.builder().kclient(kafka).build());
+        });
+    }
+
+    @Test
+    void collect() {
+        // metrics is null
+        assertThrows(NullPointerException.class, () -> {
+            CollectRep.MetricsData.Builder builder = 
CollectRep.MetricsData.newBuilder();
+            collect.collect(builder, 1L, "app", null);
+        });
+
+        assertDoesNotThrow(() -> {
+            CollectRep.MetricsData.Builder builder = 
CollectRep.MetricsData.newBuilder();
+            KafkaProtocol kafka = 
KafkaProtocol.builder().host("127.0.0.1").port("9092").build();
+            Metrics metrics = Metrics.builder().kclient(kafka).build();
+            collect.collect(builder, 1L, "app", metrics);
+        });
+    }
+
+    @Test
+    void supportProtocol() {
+        assertEquals(DispatchConstants.PROTOCOL_KAFKA, 
collect.supportProtocol());
+    }
+}
diff --git a/collector/collector/pom.xml b/collector/collector/pom.xml
index e4e2801b1..172d3af83 100644
--- a/collector/collector/pom.xml
+++ b/collector/collector/pom.xml
@@ -42,6 +42,13 @@
             <version>${hertzbeat.version}</version>
         </dependency>
 
+        <!-- collector-kafka -->
+        <dependency>
+            <groupId>org.apache.hertzbeat</groupId>
+            <artifactId>hertzbeat-collector-kafka</artifactId>
+            <version>${hertzbeat.version}</version>
+        </dependency>
+
         <!-- collector-mongodb -->
         <dependency>
             <groupId>org.apache.hertzbeat</groupId>
@@ -63,6 +70,8 @@
             <version>${hertzbeat.version}</version>
         </dependency>
 
+
+
         <!-- spring -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
diff --git 
a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
 
b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
index 69b4076fe..1ece33cf3 100644
--- 
a/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
+++ 
b/collector/collector/src/main/resources/META-INF/services/org.apache.hertzbeat.collector.collect.AbstractCollect
@@ -26,3 +26,4 @@ 
org.apache.hertzbeat.collector.collect.nebulagraph.NgqlCollectImpl
 org.apache.hertzbeat.collector.collect.imap.ImapCollectImpl
 org.apache.hertzbeat.collector.collect.script.ScriptCollectImpl
 org.apache.hertzbeat.collector.collect.mqtt.MqttCollectImpl
+org.apache.hertzbeat.collector.collect.kafka.KafkaCollectImpl
diff --git a/collector/pom.xml b/collector/pom.xml
index d1dccb9d6..039b37b67 100644
--- a/collector/pom.xml
+++ b/collector/pom.xml
@@ -40,6 +40,7 @@
         <module>collector-mongodb</module>
         <module>collector-nebulagraph</module>
         <module>collector-rocketmq</module>
+        <module>collector-kafka</module>
     </modules>
 
     <dependencyManagement>
@@ -59,6 +60,11 @@
                 <artifactId>hertzbeat-collector-mongodb</artifactId>
                 <version>${hertzbeat.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.hertzbeat</groupId>
+                <artifactId>hertzbeat-collector-kafka</artifactId>
+                <version>${hertzbeat.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.hertzbeat</groupId>
                 <artifactId>hertzbeat-collector-nebulagraph</artifactId>
diff --git 
a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java 
b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
index 37b96db33..086c366a4 100644
--- a/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
+++ b/common/src/main/java/org/apache/hertzbeat/common/entity/job/Metrics.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -36,6 +37,7 @@ import 
org.apache.hertzbeat.common.entity.job.protocol.IcmpProtocol;
 import org.apache.hertzbeat.common.entity.job.protocol.ImapProtocol;
 import org.apache.hertzbeat.common.entity.job.protocol.JdbcProtocol;
 import org.apache.hertzbeat.common.entity.job.protocol.JmxProtocol;
+import org.apache.hertzbeat.common.entity.job.protocol.KafkaProtocol;
 import org.apache.hertzbeat.common.entity.job.protocol.MemcachedProtocol;
 import org.apache.hertzbeat.common.entity.job.protocol.MongodbProtocol;
 import org.apache.hertzbeat.common.entity.job.protocol.MqttProtocol;
@@ -231,6 +233,11 @@ public class Metrics {
      */
     private MqttProtocol mqtt;
 
+    /**
+     * Monitoring configuration information using the public kafka protocol
+     */
+    private KafkaProtocol kclient;
+
     /**
      * collector use - Temporarily store subTask metrics response data
      */
diff --git 
a/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
 
b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
new file mode 100644
index 000000000..9603e1ffd
--- /dev/null
+++ 
b/common/src/main/java/org/apache/hertzbeat/common/entity/job/protocol/KafkaProtocol.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.common.entity.job.protocol;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Kafka protocol
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class KafkaProtocol {
+
+    /**
+     * IP ADDRESS OR DOMAIN NAME OF THE PEER HOST
+     */
+    private String host;
+
+    /**
+     * Port number
+     */
+    private String port;
+
+    /**
+     * TIME OUT PERIOD
+     */
+    private String timeout;
+
+    /**
+     * COMMAND
+     */
+    private String command;
+}
diff --git a/home/docs/help/kafka_client.md b/home/docs/help/kafka_client.md
new file mode 100644
index 000000000..baedd1c4a
--- /dev/null
+++ b/home/docs/help/kafka_client.md
@@ -0,0 +1,47 @@
+---
+id: kafka_client  
+title: Monitoring: Kafka Monitoring (Client-based)    
+sidebar_label: Kafka Monitoring (Client-based)  
+keywords: [open-source monitoring system, open-source message middleware 
monitoring, Kafka monitoring]
+---
+
+> Collect and monitor general metrics for Kafka.
+
+### Configuration Parameters
+
+| Parameter Name   | Help Description                                          
    |
+|------------------|---------------------------------------------------------------|
+| Monitoring Host  | The monitored peer's IPv4, IPv6, or domain name. Note: ⚠️ 
Do not include protocol headers (e.g., https://, http://). |
+| Monitoring Port  | The monitored service port.                               
     |
+| Task Name        | The identifier for this monitoring task, which must be 
unique. |
+| Collection Interval | The interval for periodic data collection, in seconds. 
The minimum allowable interval is 30 seconds. |
+| Description/Remarks | Additional information to describe and identify this 
monitoring task. Users can add remarks here. |
+
+### Collected Metrics
+
+#### Metric Set: topic_list
+
+| Metric Name  | Unit | Help Description |
+|--------------|------|------------------|
+| TopicName    | None | Topic Name       |
+
+#### Metric Set: topic_detail
+
+| Metric Name          | Unit | Help Description |
+|----------------------|------|------------------|
+| TopicName            | None | Topic Name       |
+| PartitionNum         | None | Number of Partitions |
+| PartitionLeader      | None | Partition Leader |
+| BrokerHost           | None | Broker Host      |
+| BrokerPort           | None | Broker Port      |
+| ReplicationFactorSize| None | Replication Factor Size |
+| ReplicationFactor    | None | Replication Factor |
+
+#### Metric Set: topic_offset
+
+| Metric Name   | Unit | Help Description |
+|---------------|------|------------------|
+| TopicName     | None | Topic Name       |
+| PartitionNum  | None | Number of Partitions |
+| earliest      | None | Earliest Offset  |
+| latest        | None | Latest Offset    |
diff --git 
a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md 
b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
new file mode 100644
index 000000000..1ae63e03b
--- /dev/null
+++ 
b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/help/kafka_client.md
@@ -0,0 +1,47 @@
+---
+id: kafka_client  
+title: 监控:Kafka监控(基于客户端)    
+sidebar_label: Kafka监控(基于客户端)
+keywords: [开源监控系统, 开源消息中间件监控, Kafka监控]
+---
+
+> 对Kafka的通用指标进行采集监控
+
+### 配置参数
+
+| 参数名称   | 参数帮助描述                                               |
+|--------|------------------------------------------------------|
+| 监控Host | 被监控的对端IPV4,IPV6或域名。注意⚠️不带协议头(eg: https://, http://)。 |
+| 监控Port | 被监控的服务端口。                                            |
+| 任务名称   | 标识此监控的名称,名称需要保证唯一性。                                  |
+| 采集间隔   | 监控周期性采集数据间隔时间,单位秒,可设置的最小间隔为30秒                       |
+| 描述备注   | 更多标识和描述此监控的备注信息,用户可以在这里备注信息                          |
+
+### 采集指标
+
+#### 指标集合:topic_list
+
+|    指标名称     | 指标单位 | 指标帮助描述  |
+|-------------|------|---------|
+| TopicName     | 无    | 主题名称 |
+
+#### 指标集合:topic_detail
+
+|   指标名称    | 指标单位 | 指标帮助描述 |
+|-----------|------|--------|
+| TopicName | 无    | 主题名称     |
+| PartitionNum      | 无   | 分区数量  |
+| PartitionLeader       | 无   | 分区领导者     |
+| BrokerHost      | 无   | Broker主机    |
+| BrokerPort      | 无   | Broker端口    |
+| ReplicationFactorSize      | 无   | 复制因子大小    |
+| ReplicationFactor      | 无   | 复制因子    |
+
+#### 指标集合:topic_offset
+
+| 指标名称  | 指标单位 | 指标帮助描述  |
+|-------|---|---------|
+| TopicName | 无  | 主题名称 |
+| PartitionNum | 无 | 分区数量 |
+| earliest | 无  | 最早偏移量 |
+| latest | 无  | 最新偏移量 |
diff --git a/manager/src/main/resources/define/app-kafka_client.yml 
b/manager/src/main/resources/define/app-kafka_client.yml
new file mode 100644
index 000000000..5d063aec9
--- /dev/null
+++ b/manager/src/main/resources/define/app-kafka_client.yml
@@ -0,0 +1,168 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# The monitoring type category:service-application service monitoring 
db-database monitoring custom-custom monitoring os-operating system monitoring
+category: mid
+# The monitoring type eg: linux windows tomcat mysql aws...
+app: kafka_client
+# The monitoring i18n name
+name:
+  zh-CN: Kafka消息系统(客户端)
+  en-US: Kafka Message(Client)
+# The description and help of this monitoring type
+help:
+  zh-CN: HertzBeat 使用 <a 
href="https://hertzbeat.apache.org/docs/advanced/extend-jmx";>Kafka Admin 
Client</a> 对 Kafka 的通用指标进行采集监控。</span>
+  en-US: HertzBeat uses <a 
href='https://hertzbeat.apache.org/docs/advanced/extend-jmx'>Kafka Admin 
Client</a> to monitoring kafka general metrics. </span>
+  zh-TW: HertzBeat 使用 <a 
href="https://hertzbeat.apache.org/docs/advanced/extend-jmx";>Kafka Admin 
Client</a> 對 Kafka 的通用指標進行采集監控。</span>
+helpLink:
+  zh-CN: https://hertzbeat.apache.org/zh-cn/docs/help/kafka_client
+  en-US: https://hertzbeat.apache.org/docs/help/kafka_client
+# Input params define for monitoring(render web ui by the definition)
+params:
+  # field-param field key
+  - field: host
+    # name-param field display i18n name
+    name:
+      zh-CN: 目标Host
+      en-US: Target Host
+    # type-param field type(most mapping the html input type)
+    type: host
+    # required-true or false
+    required: true
+  - field: port
+    name:
+      zh-CN: 端口
+      en-US: Port
+    type: number
+    # when type is number, range is required
+    range: '[0,65535]'
+    required: true
+    defaultValue: 9092
+
+# collect metrics config list
+metrics:
+  # metrics - server_info
+  - name: topic_list
+    i18n:
+      zh-CN: 主题列表
+      en-US: Topic List
+    # metrics scheduling priority(0->127)->(high->low), metrics with the same 
priority will be scheduled in parallel
+    # priority 0's metrics is availability metrics, it will be scheduled 
first, only availability metrics collect success will the scheduling continue
+    priority: 0
+    # collect metrics content
+    fields:
+      # field-metric name, type-metric type(0-number,1-string), unit-metric 
unit('%','ms','MB'), label-whether it is a metrics label field
+      - field: TopicName
+        type: 1
+        i18n:
+          zh-CN: 主题名称
+          en-US: Topic Name
+    # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, 
sdk
+    protocol: kclient
+    # the config content when protocol is jmx
+    kclient:
+      host: ^_^host^_^
+      port: ^_^port^_^
+      command: topic-list
+  - name: topic_detail
+    i18n:
+      zh-CN: 主题详细信息
+      en-US: Topic Detail Info
+    # metrics scheduling priority(0->127)->(high->low), metrics with the same 
priority will be scheduled in parallel
+    # priority 0's metrics is availability metrics, it will be scheduled 
first, only availability metrics collect success will the scheduling continue
+    priority: 0
+    # collect metrics content
+    fields:
+      # field-metric name, type-metric type(0-number,1-string), unit-metric 
unit('%','ms','MB'), label-whether it is a metrics label field
+      - field: TopicName
+        type: 1
+        i18n:
+          zh-CN: 主题名称
+          en-US: Topic Name
+      - field: PartitionNum
+        type: 1
+        i18n:
+          zh-CN: 分区数量
+          en-US: Partition Num
+      - field: PartitionLeader
+        type: 1
+        i18n:
+          zh-CN: 分区领导者
+          en-US: Partition Leader
+      - field: BrokerHost
+        type: 1
+        i18n:
+          zh-CN: Broker主机
+          en-US: Broker Host
+      - field: BrokerPort
+        type: 1
+        i18n:
+          zh-CN: Broker端口
+          en-US: Broker Port
+      - field: ReplicationFactorSize
+        type: 1
+        i18n:
+          zh-CN: 复制因子大小
+          en-US: Replication Factor Size
+      - field: ReplicationFactor
+        type: 1
+        i18n:
+          zh-CN: 复制因子
+          en-US: Replication Factor
+    # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, 
sdk
+    protocol: kclient
+    # the config content when protocol is jmx
+    kclient:
+      host: ^_^host^_^
+      port: ^_^port^_^
+      command: topic-describe
+  - name: topic_offset
+    i18n:
+      zh-CN: 主题偏移量
+      en-US: Topic Offset
+    # metrics scheduling priority(0->127)->(high->low), metrics with the same 
priority will be scheduled in parallel
+    # priority 0's metrics is availability metrics, it will be scheduled 
first, only availability metrics collect success will the scheduling continue
+    priority: 0
+    # collect metrics content
+    fields:
+      # field-metric name, type-metric type(0-number,1-string), unit-metric 
unit('%','ms','MB'), label-whether it is a metrics label field
+      - field: TopicName
+        type: 1
+        i18n:
+          zh-CN: 主题名称
+          en-US: Topic Name
+      - field: PartitionNum
+        type: 1
+        i18n:
+          zh-CN: 分区数量
+          en-US: Partition Num
+      - field: earliest
+        type: 0
+        i18n:
+          zh-CN: 最早偏移量
+          en-US: Earliest Offset
+      - field: latest
+        type: 0
+        i18n:
+          zh-CN: 最新偏移量
+          en-US: Latest Offset
+    # the protocol used for monitoring, eg: sql, ssh, http, telnet, wmi, snmp, 
sdk
+    protocol: kclient
+    # the config content when protocol is jmx
+    kclient:
+      host: ^_^host^_^
+      port: ^_^port^_^
+      command: topic-offset
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to