This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kafka-console in repository https://gitbox.apache.org/repos/asf/camel.git
commit c9e2aba93abd0c21ac19d3726495a85aabb4538c Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jul 9 12:41:38 2024 +0200 CAMEL-20956: camel-kafka - Add dev console and jbang command --- .../camel/component/kafka/KafkaDevConsole.java | 5 - .../camel/cli/connector/LocalCliConnector.java | 7 + .../dsl/jbang/core/commands/CamelJBangMain.java | 1 + .../dsl/jbang/core/commands/process/ListKafka.java | 223 +++++++++++++++++++++ 4 files changed, 231 insertions(+), 5 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java index 86631f915ac..0bbbac18b97 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java @@ -40,9 +40,6 @@ public class KafkaDevConsole extends AbstractDevConsole { if (route.getConsumer() instanceof KafkaConsumer kc) { sb.append(String.format("\n Route Id: %s", route.getRouteId())); sb.append(String.format("\n From: %s", route.getEndpoint().getEndpointUri())); - sb.append( - String.format("\n State: %s", getCamelContext().getRouteController().getRouteStatus(route.getId()))); - sb.append(String.format("\n Uptime: %s", route.getUptime())); for (KafkaFetchRecords t : kc.tasks()) { sb.append(String.format("\n Worked Thread: %s", t.getThreadId())); sb.append(String.format("\n Worker State: %s", t.getState())); @@ -81,8 +78,6 @@ public class KafkaDevConsole extends AbstractDevConsole { JsonObject jo = new JsonObject(); jo.put("routeId", route.getRouteId()); jo.put("uri", route.getEndpoint().getEndpointUri()); - jo.put("state", getCamelContext().getRouteController().getRouteStatus(route.getId())); - jo.put("uptime", route.getUptime()); JsonArray arr = new JsonArray(); jo.put("workers", arr); diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index d230f7fd06e..df0d35ad9d1 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -1016,6 +1016,13 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C root.put("rests", json); } } + DevConsole dc20 = dcr.resolveById("kafka"); + if (dc20 != null) { + JsonObject json = (JsonObject) dc20.call(DevConsole.MediaType.JSON); + if (json != null && !json.isEmpty()) { + root.put("kafka", json); + } + } } // various details JsonObject mem = collectMemory(); diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java index d39848eee04..8b74f5f5e93 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java @@ -102,6 +102,7 @@ public class CamelJBangMain implements Callable<Integer> { .addSubcommand("service", new CommandLine(new ListService(main))) .addSubcommand("rest", new CommandLine(new ListRest(main))) .addSubcommand("platform-http", new CommandLine(new ListPlatformHttp(main))) + .addSubcommand("kafka", new CommandLine(new ListKafka(main))) .addSubcommand("source", new CommandLine(new CamelSourceAction(main))) .addSubcommand("route-dump", new CommandLine(new CamelRouteDumpAction(main))) .addSubcommand("startup-recorder", new CommandLine(new CamelStartupRecorderAction(main))) diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java new file mode 100644 index 00000000000..c2fdd581a49 --- /dev/null +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dsl.jbang.core.commands.process; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Locale; + +import com.github.freva.asciitable.AsciiTable; +import com.github.freva.asciitable.Column; +import com.github.freva.asciitable.HorizontalAlign; +import com.github.freva.asciitable.OverflowBehaviour; +import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; +import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates; +import org.apache.camel.dsl.jbang.core.common.ProcessHelper; +import org.apache.camel.util.TimeUtils; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; +import picocli.CommandLine; +import picocli.CommandLine.Command; + +@Command(name = "kafka", + description = "List Kafka consumers of Camel integrations", sortOptions = false) +public class ListKafka extends ProcessWatchCommand { + + @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") + String name = "*"; + + @CommandLine.Option(names = { "--sort" }, completionCandidates = PidNameAgeCompletionCandidates.class, + description = "Sort by pid, name or age", defaultValue = "pid") + String sort; + + @CommandLine.Option(names = { "--metadata" }, + description = "Show group metadata") + boolean metadata; + + @CommandLine.Option(names = { "--short-uri" }, + description = "List endpoint URI without query parameters (short)") + boolean shortUri; + + @CommandLine.Option(names = { "--wide-uri" }, + description = "List endpoint URI in full details") + boolean wideUri; + + public ListKafka(CamelJBangMain main) { + super(main); + } + + @Override + public Integer doProcessWatchCall() throws Exception { + List<Row> rows = new ArrayList<>(); + + List<Long> pids = findPids(name); + ProcessHandle.allProcesses() + .filter(ph -> pids.contains(ph.pid())) + .forEach(ph -> { + JsonObject root = loadStatus(ph.pid()); + // there must be a status file for the running Camel integration + if (root != null) { + Row copy = new Row(); + JsonObject context = (JsonObject) root.get("context"); + if (context == null) { + return; + } + copy.name = context.getString("name"); + if ("CamelJBang".equals(copy.name)) { + copy.name = ProcessHelper.extractName(root, ph); + } + copy.pid = Long.toString(ph.pid()); + copy.uptime = extractSince(ph); + copy.age = TimeUtils.printSince(copy.uptime); + + JsonObject jo = (JsonObject) root.get("kafka"); + if (jo != null) { + JsonArray arr = (JsonArray) jo.get("kafkaConsumers"); + if (arr != null) { + for (int i = 0; i < arr.size(); i++) { + Row row = copy.copy(); + jo = (JsonObject) arr.get(i); + row.routeId = jo.getString("routeId"); + row.uri = jo.getString("uri"); + row.state = jo.getString("state"); + + JsonArray wa = (JsonArray) jo.get("workers"); + if (wa != null) { + for (int j = 0; j < wa.size(); j++) { + JsonObject wo = (JsonObject) wa.get(j); + row.threadId = wo.getString("threadId"); + row.state = wo.getString("state"); + row.lastError = wo.getString("lastError"); + row.groupId = wo.getString("groupId"); + row.groupInstanceId = wo.getString("groupInstanceId"); + row.memberId = wo.getString("memberId"); + row.generationId = wo.getIntegerOrDefault("generationId", 0); + row.lastTopic = wo.getString("lastTopic"); + row.lastPartition = wo.getIntegerOrDefault("lastPartition", 0); + row.lastOffset = wo.getLongOrDefault("lastOffset", 0); + rows.add(row); + row = row.copy(); + } + } else { + rows.add(row); + } + } + } + } + } + }); + + // sort rows + rows.sort(this::sortRow); + + if (!rows.isEmpty()) { + printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( + new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), + new Column().header("NAME").dataAlign(HorizontalAlign.LEFT).maxWidth(30, OverflowBehaviour.ELLIPSIS_RIGHT) + .with(r -> r.name), + new Column().header("ROUTE").dataAlign(HorizontalAlign.LEFT).with(this::getRouteId), + new Column().header("METADATA").visible(metadata).dataAlign(HorizontalAlign.LEFT).with(this::getMetadata), + new Column().header("STATE").dataAlign(HorizontalAlign.LEFT).with(this::getState), + new Column().header("TOPIC").dataAlign(HorizontalAlign.RIGHT).with(r -> r.lastTopic), + new Column().header("PARTITION").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + r.lastPartition), + new Column().header("OFFSET").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + r.lastOffset), + new Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT) + .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT) + .with(this::getUri), + new Column().header("ENDPOINT").visible(wideUri).dataAlign(HorizontalAlign.LEFT) + .maxWidth(140, OverflowBehaviour.NEWLINE) + .with(this::getUri)))); + } + + return 0; + } + + protected int sortRow(Row o1, Row o2) { + String s = sort; + int negate = 1; + if (s.startsWith("-")) { + s = s.substring(1); + negate = -1; + } + switch (s) { + case "pid": + return Long.compare(Long.parseLong(o1.pid), Long.parseLong(o2.pid)) * negate; + case "name": + return o1.name.compareToIgnoreCase(o2.name) * negate; + case "age": + return Long.compare(o1.uptime, o2.uptime) * negate; + default: + return 0; + } + } + + private String getRouteId(Row r) { + if (r.routeId != null) { + return r.routeId; + } + return ""; + } + + private String getUri(Row r) { + String u = r.uri; + if (shortUri) { + int pos = u.indexOf('?'); + if (pos > 0) { + u = u.substring(0, pos); + } + } + return u; + } + + private String getMetadata(Row r) { + return r.groupId + "/" + r.groupInstanceId + "/" + r.memberId + "/" + r.generationId; + } + + private String getState(Row r) { + return r.state.toLowerCase(Locale.ROOT); + } + + private static class Row implements Cloneable { + String pid; + String name; + String age; + long uptime; + String routeId; + String uri; + // worker + String threadId; + String state; + String lastError; + String groupId; + String groupInstanceId; + String memberId; + int generationId; + String lastTopic; + int lastPartition; + long lastOffset; + + Row copy() { + try { + return (Row) clone(); + } catch (CloneNotSupportedException e) { + return null; + } + } + } + +}