This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch kafka-console in repository https://gitbox.apache.org/repos/asf/camel.git
commit 9efff91135eaa0ca30f7dd5b873d1a2bb2e583a9 Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Tue Jul 9 15:20:21 2024 +0200 CAMEL-20956: camel-kafka - Add dev console and jbang command --- .../camel/component/kafka/KafkaDevConsole.java | 61 ++++++++++++++++ .../camel/component/kafka/KafkaFetchRecords.java | 57 +++++++++++++-- .../camel/cli/connector/LocalCliConnector.java | 14 ++++ .../dsl/jbang/core/commands/process/ListKafka.java | 83 +++++++++++++++++++++- .../core/commands/process/ProcessWatchCommand.java | 6 +- 5 files changed, 214 insertions(+), 7 deletions(-) diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java index 0bbbac18b97..a0fb775759a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java @@ -19,22 +19,39 @@ package org.apache.camel.component.kafka; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.camel.Route; import org.apache.camel.spi.annotations.DevConsole; import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.util.StopWatch; import org.apache.camel.util.json.JsonArray; import org.apache.camel.util.json.JsonObject; +import org.apache.kafka.shaded.io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @DevConsole(name = "kafka", displayName = "Kafka", description = "Apache Kafka") public class KafkaDevConsole extends AbstractDevConsole { + private static final Logger LOG = LoggerFactory.getLogger(KafkaDevConsole.class); + + private static final long COMMITTED_TIMEOUT = 10000; + + /** + * Whether to include committed offset (sync operation to Kafka broker) + */ + public static final String COMMITTED = "committed"; + public KafkaDevConsole() { super("camel", "kafka", "Kafka", "Apache Kafka"); } @Override protected String doCallText(Map<String, Object> options) { + final boolean committed = "true".equals(options.getOrDefault(COMMITTED, "false")); + StringBuilder sb = new StringBuilder(); for (Route route : getCamelContext().getRoutes()) { if (route.getConsumer() instanceof KafkaConsumer kc) { @@ -58,6 +75,16 @@ public class KafkaDevConsole extends AbstractDevConsole { sb.append(String.format("\n Last Partition: %d", t.getLastRecord().partition())); sb.append(String.format("\n Last Offset: %d", t.getLastRecord().offset())); } + if (committed) { + List<KafkaFetchRecords.KafkaTopicPosition> l = fetchCommitOffsets(kc, t); + if (l != null) { + for (KafkaFetchRecords.KafkaTopicPosition r : l) { + sb.append(String.format("\n Commit Topic: %s", r.topic())); + sb.append(String.format("\n Commit Partition: %s", r.partition())); + sb.append(String.format("\n Commit Offset: %s", r.offset())); + } + } + } } sb.append("\n"); } @@ -66,8 +93,26 @@ public class KafkaDevConsole extends AbstractDevConsole { return sb.toString(); } + private static List<KafkaFetchRecords.KafkaTopicPosition> fetchCommitOffsets(KafkaConsumer kc, KafkaFetchRecords task) { + StopWatch watch = new StopWatch(); + + CountDownLatch latch = task.fetchCommitRecords(); + long timeout = Math.min(kc.getEndpoint().getConfiguration().getPollTimeoutMs(), COMMITTED_TIMEOUT); + try { + latch.await(timeout, TimeUnit.MILLISECONDS); + var answer = task.getCommitRecords(); + LOG.info("Fetching commit offsets took: {} ms", watch.taken()); + return answer; + } catch (Exception e) { + // ignore + } + return null; + } + @Override protected Map<String, Object> doCallJson(Map<String, Object> options) { + final boolean committed = "true".equals(options.getOrDefault(COMMITTED, "false")); + JsonObject root = new JsonObject(); List<JsonObject> list = new ArrayList<>(); @@ -102,6 +147,22 @@ public class KafkaDevConsole extends AbstractDevConsole { wo.put("lastPartition", t.getLastRecord().partition()); wo.put("lastOffset", t.getLastRecord().offset()); } + if (committed) { + List<KafkaFetchRecords.KafkaTopicPosition> l = fetchCommitOffsets(kc, t); + if (l != null) { + JsonArray ca = new JsonArray(); + for (KafkaFetchRecords.KafkaTopicPosition r : l) { + JsonObject cr = new JsonObject(); + cr.put("topic", r.topic()); + cr.put("partition", r.partition()); + cr.put("offset", r.offset()); + ca.add(cr); + } + if (!ca.isEmpty()) { + wo.put("committed", ca); + } + } + } } list.add(jo); } diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 9d64ed69dbc..05b6ede1fe2 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -17,8 +17,15 @@ package org.apache.camel.component.kafka; import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Properties; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; @@ -45,7 +52,6 @@ import org.apache.camel.util.IOHelper; import org.apache.camel.util.ReflectionHelper; import org.apache.camel.util.TimeUtils; import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -101,11 +107,14 @@ public class KafkaFetchRecords implements Runnable { record GroupMetadata(String groupId, String groupInstanceId, String memberId, int generationId) { } - record LastRecord(String topic, int partition, long offset) { + record KafkaTopicPosition(String topic, int partition, long offset) { } private volatile GroupMetadata groupMetadata; - private volatile LastRecord lastRecord; + private volatile KafkaTopicPosition lastRecord; + private final List<KafkaTopicPosition> commitRecords = new ArrayList<>(); + private final AtomicBoolean commitRecordsRequested = new AtomicBoolean(); + private final AtomicReference<CountDownLatch> latch = new AtomicReference<>(); KafkaFetchRecords(KafkaConsumer kafkaConsumer, BridgeExceptionHandlerToErrorHandler bridge, String topicName, Pattern topicPattern, String id, @@ -361,6 +370,28 @@ public class KafkaFetchRecords implements Runnable { final KafkaRecordProcessorFacade recordProcessorFacade = createRecordProcessor(); while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && pollExceptionStrategy.canContinue()) { + + if (commitRecordsRequested.compareAndSet(true, false)) { + try { + // we want to get details about last committed offsets (which MUST be done by this consumer thread) + Map<TopicPartition, OffsetAndMetadata> commits = consumer.committed(consumer.assignment()); + commitRecords.clear(); + for (var e : commits.entrySet()) { + KafkaTopicPosition p + = new KafkaTopicPosition(e.getKey().topic(), e.getKey().partition(), e.getValue().offset()); + commitRecords.add(p); + } + CountDownLatch count = latch.get(); + if (count != null) { + count.countDown(); + } + } catch (Exception e) { + // ignore cannot get last commit details + LOG.debug("Cannot get last offset committed from Kafka brokers due to: {}. This exception is ignored.", + e.getMessage(), e); + } + } + ConsumerRecords<Object, Object> allRecords = consumer.poll(pollDuration); if (consumerListener != null) { if (!consumerListener.afterConsume(consumer)) { @@ -370,7 +401,7 @@ public class KafkaFetchRecords implements Runnable { ProcessingResult result = recordProcessorFacade.processPolledRecords(allRecords); if (result != null && result.getTopic() != null) { - lastRecord = new LastRecord(result.getTopic(), result.getPartition(), result.getOffset()); + lastRecord = new KafkaTopicPosition(result.getTopic(), result.getPartition(), result.getOffset()); } updateTaskState(); @@ -663,7 +694,7 @@ public class KafkaFetchRecords implements Runnable { return groupMetadata; } - public LastRecord getLastRecord() { + public KafkaTopicPosition getLastRecord() { return lastRecord; } @@ -674,4 +705,20 @@ public class KafkaFetchRecords implements Runnable { String getState() { return state.name(); } + + CountDownLatch fetchCommitRecords() { + commitRecords.clear(); + commitRecordsRequested.set(true); + CountDownLatch answer = new CountDownLatch(1); + latch.set(answer); + return answer; + } + + List<KafkaTopicPosition> getCommitRecords() { + if (commitRecords != null) { + return Collections.unmodifiableList(commitRecords); + } else { + return null; + } + } } diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index df0d35ad9d1..d8ab4238f89 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -265,6 +265,8 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C doActionTransformTask(root); } else if ("bean".equals(action)) { doActionBeanTask(root); + } else if ("kafka".equals(action)) { + doActionKafkaTask(); } } catch (Exception e) { // ignore @@ -695,6 +697,18 @@ public class LocalCliConnector extends ServiceSupport implements CliConnector, C } } + private void doActionKafkaTask() throws IOException { + DevConsole dc = camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class) + .resolveById("kafka"); + if (dc != null) { + JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, Map.of("committed", "true")); + LOG.trace("Updating output file: {}", outputFile); + IOHelper.writeText(json.toJson(), outputFile); + } else { + IOHelper.writeText("{}", outputFile); + } + } + private void doActionBeanTask(JsonObject root) throws IOException { String filter = root.getStringOrDefault("filter", ""); String properties = root.getStringOrDefault("properties", "true"); diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java index c2fdd581a49..bcf67afca94 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java @@ -16,6 +16,8 @@ */ package org.apache.camel.dsl.jbang.core.commands.process; +import java.io.File; +import java.io.FileInputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -28,9 +30,13 @@ import com.github.freva.asciitable.OverflowBehaviour; import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain; import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates; import org.apache.camel.dsl.jbang.core.common.ProcessHelper; +import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IOHelper; +import org.apache.camel.util.StopWatch; import org.apache.camel.util.TimeUtils; import org.apache.camel.util.json.JsonArray; import org.apache.camel.util.json.JsonObject; +import org.apache.camel.util.json.Jsoner; import picocli.CommandLine; import picocli.CommandLine.Command; @@ -49,6 +55,10 @@ public class ListKafka extends ProcessWatchCommand { description = "Show group metadata") boolean metadata; + @CommandLine.Option(names = { "--committed" }, + description = "Show committed offset (slower due to sync call to Kafka brokers)") + boolean committed; + @CommandLine.Option(names = { "--short-uri" }, description = "List endpoint URI without query parameters (short)") boolean shortUri; @@ -61,6 +71,11 @@ public class ListKafka extends ProcessWatchCommand { super(main); } + @Override + protected void autoClearScreen() { + // do not auto clear as can be slow when getting committed details + } + @Override public Integer doProcessWatchCall() throws Exception { List<Row> rows = new ArrayList<>(); @@ -87,7 +102,23 @@ public class ListKafka extends ProcessWatchCommand { JsonObject jo = (JsonObject) root.get("kafka"); if (jo != null) { - JsonArray arr = (JsonArray) jo.get("kafkaConsumers"); + if (committed) { + // we ask for committed so need to do an action on-demand to get data + // ensure output file is deleted before executing action + File outputFile = getOutputFile(Long.toString(ph.pid())); + FileUtil.deleteFile(outputFile); + + JsonObject root2 = new JsonObject(); + root2.put("action", "kafka"); + File file = getActionFile(Long.toString(ph.pid())); + try { + IOHelper.writeText(root2.toJson(), file); + } catch (Exception e) { + // ignore + } + jo = waitForOutputFile(outputFile); + } + JsonArray arr = jo != null ? (JsonArray) jo.get("kafkaConsumers") : null; if (arr != null) { for (int i = 0; i < arr.size(); i++) { Row row = copy.copy(); @@ -110,6 +141,26 @@ public class ListKafka extends ProcessWatchCommand { row.lastTopic = wo.getString("lastTopic"); row.lastPartition = wo.getIntegerOrDefault("lastPartition", 0); row.lastOffset = wo.getLongOrDefault("lastOffset", 0); + if (committed) { + JsonArray ca = (JsonArray) wo.get("committed"); + if (ca != null) { + JsonObject found = null; + for (int k = 0; k < ca.size(); k++) { + JsonObject co = (JsonObject) ca.get(k); + if (row.lastTopic == null + || (row.lastTopic.equals(co.getString("topic")) + && row.lastPartition == co.getInteger("partition"))) { + found = co; + break; + } + } + if (found != null) { + row.lastTopic = found.getString("topic"); + row.lastPartition = found.getIntegerOrDefault("partition", 0); + row.committedOffset = found.getLongOrDefault("offset", 0); + } + } + } rows.add(row); row = row.copy(); } @@ -125,6 +176,11 @@ public class ListKafka extends ProcessWatchCommand { // sort rows rows.sort(this::sortRow); + // clear before writing new data + if (watch) { + clearScreen(); + } + if (!rows.isEmpty()) { printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, Arrays.asList( new Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid), @@ -136,6 +192,8 @@ public class ListKafka extends ProcessWatchCommand { new Column().header("TOPIC").dataAlign(HorizontalAlign.RIGHT).with(r -> r.lastTopic), new Column().header("PARTITION").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + r.lastPartition), new Column().header("OFFSET").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + r.lastOffset), + new Column().header("COMMITTED").visible(committed).dataAlign(HorizontalAlign.RIGHT) + .with(r -> "" + r.committedOffset), new Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT) .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT) .with(this::getUri), @@ -210,6 +268,7 @@ public class ListKafka extends ProcessWatchCommand { String lastTopic; int lastPartition; long lastOffset; + long committedOffset; Row copy() { try { @@ -220,4 +279,26 @@ public class ListKafka extends ProcessWatchCommand { } } + private JsonObject waitForOutputFile(File outputFile) { + JsonObject answer = null; + + StopWatch watch = new StopWatch(); + while (watch.taken() < 10000 && answer == null) { + try { + // give time for response to be ready + Thread.sleep(100); + + if (outputFile.exists()) { + FileInputStream fis = new FileInputStream(outputFile); + String text = IOHelper.loadText(fis); + IOHelper.close(fis); + answer = (JsonObject) Jsoner.deserialize(text); + } + } catch (Exception e) { + // ignore + } + } + return answer; + } + } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java index c1a3d377403..5ac45dabdfd 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java @@ -39,7 +39,7 @@ abstract class ProcessWatchCommand extends ProcessBaseCommand { int exit; if (watch) { do { - clearScreen(); + autoClearScreen(); exit = doProcessWatchCall(); if (exit == 0) { // use 2-sec delay in watch mode @@ -52,6 +52,10 @@ abstract class ProcessWatchCommand extends ProcessBaseCommand { return exit; } + protected void autoClearScreen() { + clearScreen(); + } + protected void clearScreen() { AnsiConsole.out().print(Ansi.ansi().eraseScreen().cursor(1, 1)); }