This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kafka-console
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c9e2aba93abd0c21ac19d3726495a85aabb4538c
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Jul 9 12:41:38 2024 +0200

    CAMEL-20956: camel-kafka - Add dev console and jbang command
---
 .../camel/component/kafka/KafkaDevConsole.java     |   5 -
 .../camel/cli/connector/LocalCliConnector.java     |   7 +
 .../dsl/jbang/core/commands/CamelJBangMain.java    |   1 +
 .../dsl/jbang/core/commands/process/ListKafka.java | 223 +++++++++++++++++++++
 4 files changed, 231 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
index 86631f915ac..0bbbac18b97 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
@@ -40,9 +40,6 @@ public class KafkaDevConsole extends AbstractDevConsole {
             if (route.getConsumer() instanceof KafkaConsumer kc) {
                 sb.append(String.format("\n    Route Id: %s", 
route.getRouteId()));
                 sb.append(String.format("\n    From: %s", 
route.getEndpoint().getEndpointUri()));
-                sb.append(
-                        String.format("\n    State: %s", 
getCamelContext().getRouteController().getRouteStatus(route.getId())));
-                sb.append(String.format("\n    Uptime: %s", 
route.getUptime()));
                 for (KafkaFetchRecords t : kc.tasks()) {
                     sb.append(String.format("\n        Worked Thread: %s", 
t.getThreadId()));
                     sb.append(String.format("\n        Worker State: %s", 
t.getState()));
@@ -81,8 +78,6 @@ public class KafkaDevConsole extends AbstractDevConsole {
                 JsonObject jo = new JsonObject();
                 jo.put("routeId", route.getRouteId());
                 jo.put("uri", route.getEndpoint().getEndpointUri());
-                jo.put("state", 
getCamelContext().getRouteController().getRouteStatus(route.getId()));
-                jo.put("uptime", route.getUptime());
 
                 JsonArray arr = new JsonArray();
                 jo.put("workers", arr);
diff --git 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index d230f7fd06e..df0d35ad9d1 100644
--- 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -1016,6 +1016,13 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                         root.put("rests", json);
                     }
                 }
+                DevConsole dc20 = dcr.resolveById("kafka");
+                if (dc20 != null) {
+                    JsonObject json = (JsonObject) 
dc20.call(DevConsole.MediaType.JSON);
+                    if (json != null && !json.isEmpty()) {
+                        root.put("kafka", json);
+                    }
+                }
             }
             // various details
             JsonObject mem = collectMemory();
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
index d39848eee04..8b74f5f5e93 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java
@@ -102,6 +102,7 @@ public class CamelJBangMain implements Callable<Integer> {
                         .addSubcommand("service", new CommandLine(new 
ListService(main)))
                         .addSubcommand("rest", new CommandLine(new 
ListRest(main)))
                         .addSubcommand("platform-http", new CommandLine(new 
ListPlatformHttp(main)))
+                        .addSubcommand("kafka", new CommandLine(new 
ListKafka(main)))
                         .addSubcommand("source", new CommandLine(new 
CamelSourceAction(main)))
                         .addSubcommand("route-dump", new CommandLine(new 
CamelRouteDumpAction(main)))
                         .addSubcommand("startup-recorder", new CommandLine(new 
CamelStartupRecorderAction(main)))
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
new file mode 100644
index 00000000000..c2fdd581a49
--- /dev/null
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.dsl.jbang.core.commands.process;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+
+import com.github.freva.asciitable.AsciiTable;
+import com.github.freva.asciitable.Column;
+import com.github.freva.asciitable.HorizontalAlign;
+import com.github.freva.asciitable.OverflowBehaviour;
+import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
+import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
+import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.TimeUtils;
+import org.apache.camel.util.json.JsonArray;
+import org.apache.camel.util.json.JsonObject;
+import picocli.CommandLine;
+import picocli.CommandLine.Command;
+
+@Command(name = "kafka",
+         description = "List Kafka consumers of Camel integrations", 
sortOptions = false)
+public class ListKafka extends ProcessWatchCommand {
+
+    @CommandLine.Parameters(description = "Name or pid of running Camel 
integration", arity = "0..1")
+    String name = "*";
+
+    @CommandLine.Option(names = { "--sort" }, completionCandidates = 
PidNameAgeCompletionCandidates.class,
+                        description = "Sort by pid, name or age", defaultValue 
= "pid")
+    String sort;
+
+    @CommandLine.Option(names = { "--metadata" },
+                        description = "Show group metadata")
+    boolean metadata;
+
+    @CommandLine.Option(names = { "--short-uri" },
+                        description = "List endpoint URI without query 
parameters (short)")
+    boolean shortUri;
+
+    @CommandLine.Option(names = { "--wide-uri" },
+                        description = "List endpoint URI in full details")
+    boolean wideUri;
+
+    public ListKafka(CamelJBangMain main) {
+        super(main);
+    }
+
+    @Override
+    public Integer doProcessWatchCall() throws Exception {
+        List<Row> rows = new ArrayList<>();
+
+        List<Long> pids = findPids(name);
+        ProcessHandle.allProcesses()
+                .filter(ph -> pids.contains(ph.pid()))
+                .forEach(ph -> {
+                    JsonObject root = loadStatus(ph.pid());
+                    // there must be a status file for the running Camel 
integration
+                    if (root != null) {
+                        Row copy = new Row();
+                        JsonObject context = (JsonObject) root.get("context");
+                        if (context == null) {
+                            return;
+                        }
+                        copy.name = context.getString("name");
+                        if ("CamelJBang".equals(copy.name)) {
+                            copy.name = ProcessHelper.extractName(root, ph);
+                        }
+                        copy.pid = Long.toString(ph.pid());
+                        copy.uptime = extractSince(ph);
+                        copy.age = TimeUtils.printSince(copy.uptime);
+
+                        JsonObject jo = (JsonObject) root.get("kafka");
+                        if (jo != null) {
+                            JsonArray arr = (JsonArray) 
jo.get("kafkaConsumers");
+                            if (arr != null) {
+                                for (int i = 0; i < arr.size(); i++) {
+                                    Row row = copy.copy();
+                                    jo = (JsonObject) arr.get(i);
+                                    row.routeId = jo.getString("routeId");
+                                    row.uri = jo.getString("uri");
+                                    row.state = jo.getString("state");
+
+                                    JsonArray wa = (JsonArray) 
jo.get("workers");
+                                    if (wa != null) {
+                                        for (int j = 0; j < wa.size(); j++) {
+                                            JsonObject wo = (JsonObject) 
wa.get(j);
+                                            row.threadId = 
wo.getString("threadId");
+                                            row.state = wo.getString("state");
+                                            row.lastError = 
wo.getString("lastError");
+                                            row.groupId = 
wo.getString("groupId");
+                                            row.groupInstanceId = 
wo.getString("groupInstanceId");
+                                            row.memberId = 
wo.getString("memberId");
+                                            row.generationId = 
wo.getIntegerOrDefault("generationId", 0);
+                                            row.lastTopic = 
wo.getString("lastTopic");
+                                            row.lastPartition = 
wo.getIntegerOrDefault("lastPartition", 0);
+                                            row.lastOffset = 
wo.getLongOrDefault("lastOffset", 0);
+                                            rows.add(row);
+                                            row = row.copy();
+                                        }
+                                    } else {
+                                        rows.add(row);
+                                    }
+                                }
+                            }
+                        }
+                    }
+                });
+
+        // sort rows
+        rows.sort(this::sortRow);
+
+        if (!rows.isEmpty()) {
+            printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
+                    new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
+                    new 
Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, 
OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(r -> r.name),
+                    new 
Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).with(this::getRouteId),
+                    new 
Column().header("METADATA").visible(metadata).dataAlign(HorizontalAlign.LEFT).with(this::getMetadata),
+                    new 
Column().header("STATE").dataAlign(HorizontalAlign.LEFT).with(this::getState),
+                    new 
Column().header("TOPIC").dataAlign(HorizontalAlign.RIGHT).with(r -> 
r.lastTopic),
+                    new 
Column().header("PARTITION").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + 
r.lastPartition),
+                    new 
Column().header("OFFSET").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + 
r.lastOffset),
+                    new 
Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT)
+                            .with(this::getUri),
+                    new 
Column().header("ENDPOINT").visible(wideUri).dataAlign(HorizontalAlign.LEFT)
+                            .maxWidth(140, OverflowBehaviour.NEWLINE)
+                            .with(this::getUri))));
+        }
+
+        return 0;
+    }
+
+    protected int sortRow(Row o1, Row o2) {
+        String s = sort;
+        int negate = 1;
+        if (s.startsWith("-")) {
+            s = s.substring(1);
+            negate = -1;
+        }
+        switch (s) {
+            case "pid":
+                return Long.compare(Long.parseLong(o1.pid), 
Long.parseLong(o2.pid)) * negate;
+            case "name":
+                return o1.name.compareToIgnoreCase(o2.name) * negate;
+            case "age":
+                return Long.compare(o1.uptime, o2.uptime) * negate;
+            default:
+                return 0;
+        }
+    }
+
+    private String getRouteId(Row r) {
+        if (r.routeId != null) {
+            return r.routeId;
+        }
+        return "";
+    }
+
+    private String getUri(Row r) {
+        String u = r.uri;
+        if (shortUri) {
+            int pos = u.indexOf('?');
+            if (pos > 0) {
+                u = u.substring(0, pos);
+            }
+        }
+        return u;
+    }
+
+    private String getMetadata(Row r) {
+        return r.groupId + "/" + r.groupInstanceId + "/" + r.memberId + "/" + 
r.generationId;
+    }
+
+    private String getState(Row r) {
+        return r.state.toLowerCase(Locale.ROOT);
+    }
+
+    private static class Row implements Cloneable {
+        String pid;
+        String name;
+        String age;
+        long uptime;
+        String routeId;
+        String uri;
+        // worker
+        String threadId;
+        String state;
+        String lastError;
+        String groupId;
+        String groupInstanceId;
+        String memberId;
+        int generationId;
+        String lastTopic;
+        int lastPartition;
+        long lastOffset;
+
+        Row copy() {
+            try {
+                return (Row) clone();
+            } catch (CloneNotSupportedException e) {
+                return null;
+            }
+        }
+    }
+
+}

Reply via email to