This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch kafka-console
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 9efff91135eaa0ca30f7dd5b873d1a2bb2e583a9
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Tue Jul 9 15:20:21 2024 +0200

    CAMEL-20956: camel-kafka - Add dev console and jbang command
---
 .../camel/component/kafka/KafkaDevConsole.java     | 61 ++++++++++++++++
 .../camel/component/kafka/KafkaFetchRecords.java   | 57 +++++++++++++--
 .../camel/cli/connector/LocalCliConnector.java     | 14 ++++
 .../dsl/jbang/core/commands/process/ListKafka.java | 83 +++++++++++++++++++++-
 .../core/commands/process/ProcessWatchCommand.java |  6 +-
 5 files changed, 214 insertions(+), 7 deletions(-)

diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
index 0bbbac18b97..a0fb775759a 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaDevConsole.java
@@ -19,22 +19,39 @@ package org.apache.camel.component.kafka;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Route;
 import org.apache.camel.spi.annotations.DevConsole;
 import org.apache.camel.support.console.AbstractDevConsole;
+import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.json.JsonArray;
 import org.apache.camel.util.json.JsonObject;
+import 
org.apache.kafka.shaded.io.opentelemetry.proto.collector.logs.v1.LogsServiceGrpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @DevConsole(name = "kafka", displayName = "Kafka", description = "Apache 
Kafka")
 public class KafkaDevConsole extends AbstractDevConsole {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDevConsole.class);
+
+    private static final long COMMITTED_TIMEOUT = 10000;
+
+    /**
+     * Whether to include committed offset (sync operation to Kafka broker)
+     */
+    public static final String COMMITTED = "committed";
+
     public KafkaDevConsole() {
         super("camel", "kafka", "Kafka", "Apache Kafka");
     }
 
     @Override
     protected String doCallText(Map<String, Object> options) {
+        final boolean committed = 
"true".equals(options.getOrDefault(COMMITTED, "false"));
+
         StringBuilder sb = new StringBuilder();
         for (Route route : getCamelContext().getRoutes()) {
             if (route.getConsumer() instanceof KafkaConsumer kc) {
@@ -58,6 +75,16 @@ public class KafkaDevConsole extends AbstractDevConsole {
                         sb.append(String.format("\n        Last Partition: 
%d", t.getLastRecord().partition()));
                         sb.append(String.format("\n        Last Offset: %d", 
t.getLastRecord().offset()));
                     }
+                    if (committed) {
+                        List<KafkaFetchRecords.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, t);
+                        if (l != null) {
+                            for (KafkaFetchRecords.KafkaTopicPosition r : l) {
+                                sb.append(String.format("\n        Commit 
Topic: %s", r.topic()));
+                                sb.append(String.format("\n        Commit 
Partition: %s", r.partition()));
+                                sb.append(String.format("\n        Commit 
Offset: %s", r.offset()));
+                            }
+                        }
+                    }
                 }
                 sb.append("\n");
             }
@@ -66,8 +93,26 @@ public class KafkaDevConsole extends AbstractDevConsole {
         return sb.toString();
     }
 
+    private static List<KafkaFetchRecords.KafkaTopicPosition> 
fetchCommitOffsets(KafkaConsumer kc, KafkaFetchRecords task) {
+        StopWatch watch = new StopWatch();
+
+        CountDownLatch latch = task.fetchCommitRecords();
+        long timeout = 
Math.min(kc.getEndpoint().getConfiguration().getPollTimeoutMs(), 
COMMITTED_TIMEOUT);
+        try {
+            latch.await(timeout, TimeUnit.MILLISECONDS);
+            var answer = task.getCommitRecords();
+            LOG.info("Fetching commit offsets took: {} ms", watch.taken());
+            return answer;
+        } catch (Exception e) {
+            // ignore
+        }
+        return null;
+    }
+
     @Override
     protected Map<String, Object> doCallJson(Map<String, Object> options) {
+        final boolean committed = 
"true".equals(options.getOrDefault(COMMITTED, "false"));
+
         JsonObject root = new JsonObject();
 
         List<JsonObject> list = new ArrayList<>();
@@ -102,6 +147,22 @@ public class KafkaDevConsole extends AbstractDevConsole {
                         wo.put("lastPartition", t.getLastRecord().partition());
                         wo.put("lastOffset", t.getLastRecord().offset());
                     }
+                    if (committed) {
+                        List<KafkaFetchRecords.KafkaTopicPosition> l = 
fetchCommitOffsets(kc, t);
+                        if (l != null) {
+                            JsonArray ca = new JsonArray();
+                            for (KafkaFetchRecords.KafkaTopicPosition r : l) {
+                                JsonObject cr = new JsonObject();
+                                cr.put("topic", r.topic());
+                                cr.put("partition", r.partition());
+                                cr.put("offset", r.offset());
+                                ca.add(cr);
+                            }
+                            if (!ca.isEmpty()) {
+                                wo.put("committed", ca);
+                            }
+                        }
+                    }
                 }
                 list.add(jo);
             }
diff --git 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 9d64ed69dbc..05b6ede1fe2 100644
--- 
a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ 
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -17,8 +17,15 @@
 package org.apache.camel.component.kafka;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
@@ -45,7 +52,6 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ReflectionHelper;
 import org.apache.camel.util.TimeUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -101,11 +107,14 @@ public class KafkaFetchRecords implements Runnable {
     record GroupMetadata(String groupId, String groupInstanceId, String 
memberId, int generationId) {
     }
 
-    record LastRecord(String topic, int partition, long offset) {
+    record KafkaTopicPosition(String topic, int partition, long offset) {
     }
 
     private volatile GroupMetadata groupMetadata;
-    private volatile LastRecord lastRecord;
+    private volatile KafkaTopicPosition lastRecord;
+    private final List<KafkaTopicPosition> commitRecords = new ArrayList<>();
+    private final AtomicBoolean commitRecordsRequested = new AtomicBoolean();
+    private final AtomicReference<CountDownLatch> latch = new 
AtomicReference<>();
 
     KafkaFetchRecords(KafkaConsumer kafkaConsumer,
                       BridgeExceptionHandlerToErrorHandler bridge, String 
topicName, Pattern topicPattern, String id,
@@ -361,6 +370,28 @@ public class KafkaFetchRecords implements Runnable {
             final KafkaRecordProcessorFacade recordProcessorFacade = 
createRecordProcessor();
 
             while (isKafkaConsumerRunnableAndNotStopped() && isConnected() && 
pollExceptionStrategy.canContinue()) {
+
+                if (commitRecordsRequested.compareAndSet(true, false)) {
+                    try {
+                        // we want to get details about last committed offsets 
(which MUST be done by this consumer thread)
+                        Map<TopicPartition, OffsetAndMetadata> commits = 
consumer.committed(consumer.assignment());
+                        commitRecords.clear();
+                        for (var e : commits.entrySet()) {
+                            KafkaTopicPosition p
+                                    = new 
KafkaTopicPosition(e.getKey().topic(), e.getKey().partition(), 
e.getValue().offset());
+                            commitRecords.add(p);
+                        }
+                        CountDownLatch count = latch.get();
+                        if (count != null) {
+                            count.countDown();
+                        }
+                    } catch (Exception e) {
+                        // ignore cannot get last commit details
+                        LOG.debug("Cannot get last offset committed from Kafka 
brokers due to: {}. This exception is ignored.",
+                                e.getMessage(), e);
+                    }
+                }
+
                 ConsumerRecords<Object, Object> allRecords = 
consumer.poll(pollDuration);
                 if (consumerListener != null) {
                     if (!consumerListener.afterConsume(consumer)) {
@@ -370,7 +401,7 @@ public class KafkaFetchRecords implements Runnable {
 
                 ProcessingResult result = 
recordProcessorFacade.processPolledRecords(allRecords);
                 if (result != null && result.getTopic() != null) {
-                    lastRecord = new LastRecord(result.getTopic(), 
result.getPartition(), result.getOffset());
+                    lastRecord = new KafkaTopicPosition(result.getTopic(), 
result.getPartition(), result.getOffset());
                 }
                 updateTaskState();
 
@@ -663,7 +694,7 @@ public class KafkaFetchRecords implements Runnable {
         return groupMetadata;
     }
 
-    public LastRecord getLastRecord() {
+    public KafkaTopicPosition getLastRecord() {
         return lastRecord;
     }
 
@@ -674,4 +705,20 @@ public class KafkaFetchRecords implements Runnable {
     String getState() {
         return state.name();
     }
+
+    CountDownLatch fetchCommitRecords() {
+        commitRecords.clear();
+        commitRecordsRequested.set(true);
+        CountDownLatch answer = new CountDownLatch(1);
+        latch.set(answer);
+        return answer;
+    }
+
+    List<KafkaTopicPosition> getCommitRecords() {
+        if (commitRecords != null) {
+            return Collections.unmodifiableList(commitRecords);
+        } else {
+            return null;
+        }
+    }
 }
diff --git 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
index df0d35ad9d1..d8ab4238f89 100644
--- 
a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
+++ 
b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java
@@ -265,6 +265,8 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
                 doActionTransformTask(root);
             } else if ("bean".equals(action)) {
                 doActionBeanTask(root);
+            } else if ("kafka".equals(action)) {
+                doActionKafkaTask();
             }
         } catch (Exception e) {
             // ignore
@@ -695,6 +697,18 @@ public class LocalCliConnector extends ServiceSupport 
implements CliConnector, C
         }
     }
 
+    private void doActionKafkaTask() throws IOException {
+        DevConsole dc = 
camelContext.getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class)
+                .resolveById("kafka");
+        if (dc != null) {
+            JsonObject json = (JsonObject) dc.call(DevConsole.MediaType.JSON, 
Map.of("committed", "true"));
+            LOG.trace("Updating output file: {}", outputFile);
+            IOHelper.writeText(json.toJson(), outputFile);
+        } else {
+            IOHelper.writeText("{}", outputFile);
+        }
+    }
+
     private void doActionBeanTask(JsonObject root) throws IOException {
         String filter = root.getStringOrDefault("filter", "");
         String properties = root.getStringOrDefault("properties", "true");
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
index c2fdd581a49..bcf67afca94 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListKafka.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.dsl.jbang.core.commands.process;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -28,9 +30,13 @@ import com.github.freva.asciitable.OverflowBehaviour;
 import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
 import org.apache.camel.dsl.jbang.core.common.PidNameAgeCompletionCandidates;
 import org.apache.camel.dsl.jbang.core.common.ProcessHelper;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.TimeUtils;
 import org.apache.camel.util.json.JsonArray;
 import org.apache.camel.util.json.JsonObject;
+import org.apache.camel.util.json.Jsoner;
 import picocli.CommandLine;
 import picocli.CommandLine.Command;
 
@@ -49,6 +55,10 @@ public class ListKafka extends ProcessWatchCommand {
                         description = "Show group metadata")
     boolean metadata;
 
+    @CommandLine.Option(names = { "--committed" },
+                        description = "Show committed offset (slower due to 
sync call to Kafka brokers)")
+    boolean committed;
+
     @CommandLine.Option(names = { "--short-uri" },
                         description = "List endpoint URI without query 
parameters (short)")
     boolean shortUri;
@@ -61,6 +71,11 @@ public class ListKafka extends ProcessWatchCommand {
         super(main);
     }
 
+    @Override
+    protected void autoClearScreen() {
+        // do not auto clear as can be slow when getting committed details
+    }
+
     @Override
     public Integer doProcessWatchCall() throws Exception {
         List<Row> rows = new ArrayList<>();
@@ -87,7 +102,23 @@ public class ListKafka extends ProcessWatchCommand {
 
                         JsonObject jo = (JsonObject) root.get("kafka");
                         if (jo != null) {
-                            JsonArray arr = (JsonArray) 
jo.get("kafkaConsumers");
+                            if (committed) {
+                                // we ask for committed so need to do an 
action on-demand to get data
+                                // ensure output file is deleted before 
executing action
+                                File outputFile = 
getOutputFile(Long.toString(ph.pid()));
+                                FileUtil.deleteFile(outputFile);
+
+                                JsonObject root2 = new JsonObject();
+                                root2.put("action", "kafka");
+                                File file = 
getActionFile(Long.toString(ph.pid()));
+                                try {
+                                    IOHelper.writeText(root2.toJson(), file);
+                                } catch (Exception e) {
+                                    // ignore
+                                }
+                                jo = waitForOutputFile(outputFile);
+                            }
+                            JsonArray arr = jo != null ? (JsonArray) 
jo.get("kafkaConsumers") : null;
                             if (arr != null) {
                                 for (int i = 0; i < arr.size(); i++) {
                                     Row row = copy.copy();
@@ -110,6 +141,26 @@ public class ListKafka extends ProcessWatchCommand {
                                             row.lastTopic = 
wo.getString("lastTopic");
                                             row.lastPartition = 
wo.getIntegerOrDefault("lastPartition", 0);
                                             row.lastOffset = 
wo.getLongOrDefault("lastOffset", 0);
+                                            if (committed) {
+                                                JsonArray ca = (JsonArray) 
wo.get("committed");
+                                                if (ca != null) {
+                                                    JsonObject found = null;
+                                                    for (int k = 0; k < 
ca.size(); k++) {
+                                                        JsonObject co = 
(JsonObject) ca.get(k);
+                                                        if (row.lastTopic == 
null
+                                                                || 
(row.lastTopic.equals(co.getString("topic"))
+                                                                        && 
row.lastPartition == co.getInteger("partition"))) {
+                                                            found = co;
+                                                            break;
+                                                        }
+                                                    }
+                                                    if (found != null) {
+                                                        row.lastTopic = 
found.getString("topic");
+                                                        row.lastPartition = 
found.getIntegerOrDefault("partition", 0);
+                                                        row.committedOffset = 
found.getLongOrDefault("offset", 0);
+                                                    }
+                                                }
+                                            }
                                             rows.add(row);
                                             row = row.copy();
                                         }
@@ -125,6 +176,11 @@ public class ListKafka extends ProcessWatchCommand {
         // sort rows
         rows.sort(this::sortRow);
 
+        // clear before writing new data
+        if (watch) {
+            clearScreen();
+        }
+
         if (!rows.isEmpty()) {
             printer().println(AsciiTable.getTable(AsciiTable.NO_BORDERS, rows, 
Arrays.asList(
                     new 
Column().header("PID").headerAlign(HorizontalAlign.CENTER).with(r -> r.pid),
@@ -136,6 +192,8 @@ public class ListKafka extends ProcessWatchCommand {
                     new 
Column().header("TOPIC").dataAlign(HorizontalAlign.RIGHT).with(r -> 
r.lastTopic),
                     new 
Column().header("PARTITION").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + 
r.lastPartition),
                     new 
Column().header("OFFSET").dataAlign(HorizontalAlign.RIGHT).with(r -> "" + 
r.lastOffset),
+                    new 
Column().header("COMMITTED").visible(committed).dataAlign(HorizontalAlign.RIGHT)
+                            .with(r -> "" + r.committedOffset),
                     new 
Column().header("ENDPOINT").visible(!wideUri).dataAlign(HorizontalAlign.LEFT)
                             .maxWidth(90, OverflowBehaviour.ELLIPSIS_RIGHT)
                             .with(this::getUri),
@@ -210,6 +268,7 @@ public class ListKafka extends ProcessWatchCommand {
         String lastTopic;
         int lastPartition;
         long lastOffset;
+        long committedOffset;
 
         Row copy() {
             try {
@@ -220,4 +279,26 @@ public class ListKafka extends ProcessWatchCommand {
         }
     }
 
+    private JsonObject waitForOutputFile(File outputFile) {
+        JsonObject answer = null;
+
+        StopWatch watch = new StopWatch();
+        while (watch.taken() < 10000 && answer == null) {
+            try {
+                // give time for response to be ready
+                Thread.sleep(100);
+
+                if (outputFile.exists()) {
+                    FileInputStream fis = new FileInputStream(outputFile);
+                    String text = IOHelper.loadText(fis);
+                    IOHelper.close(fis);
+                    answer = (JsonObject) Jsoner.deserialize(text);
+                }
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+        return answer;
+    }
+
 }
diff --git 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
index c1a3d377403..5ac45dabdfd 100644
--- 
a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
+++ 
b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ProcessWatchCommand.java
@@ -39,7 +39,7 @@ abstract class ProcessWatchCommand extends ProcessBaseCommand 
{
         int exit;
         if (watch) {
             do {
-                clearScreen();
+                autoClearScreen();
                 exit = doProcessWatchCall();
                 if (exit == 0) {
                     // use 2-sec delay in watch mode
@@ -52,6 +52,10 @@ abstract class ProcessWatchCommand extends 
ProcessBaseCommand {
         return exit;
     }
 
+    protected void autoClearScreen() {
+        clearScreen();
+    }
+
     protected void clearScreen() {
         AnsiConsole.out().print(Ansi.ansi().eraseScreen().cursor(1, 1));
     }

Reply via email to