This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 78090bb4cdd KAFKA-14752: Kafka examples improvements - producer 
changes (#13515)
78090bb4cdd is described below

commit 78090bb4cdd2494f0b720d34e17ee0cc645fc399
Author: Federico Valeri <[email protected]>
AuthorDate: Mon May 8 04:15:52 2023 +0200

    KAFKA-14752: Kafka examples improvements - producer changes (#13515)
    
    KAFKA-14752: Kafka examples improvements - producer changes
    
    Reviewers: Luke Chen <[email protected]>, Christo Lolov 
<[email protected]>
---
 .../examples/ExactlyOnceMessageProcessor.java      |   4 +-
 .../kafka/examples/KafkaConsumerProducerDemo.java  |   3 +-
 .../java/kafka/examples/KafkaExactlyOnceDemo.java  |   3 +-
 .../src/main/java/kafka/examples/Producer.java     | 216 ++++++++++++---------
 4 files changed, 131 insertions(+), 95 deletions(-)

diff --git 
a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java 
b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
index 6de6ab99d26..3fa1e4ba041 100644
--- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -66,7 +66,9 @@ public class ExactlyOnceMessageProcessor extends Thread {
         // It is recommended to have a relatively short txn timeout in order 
to clear pending offsets faster.
         final int transactionTimeoutMs = 10000;
         // A unique transactional.id must be provided in order to properly use 
EOS.
-        producer = new Producer(outputTopic, true, transactionalId, true, -1, 
transactionTimeoutMs, null).get();
+        producer = new Producer(
+            "processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true, 
-1, transactionTimeoutMs, null)
+                .createKafkaProducer();
         // Consumer must be in read_committed mode, which means it won't be 
able to read uncommitted data.
         // Consumer could optionally configure groupInstanceId to avoid 
unnecessary rebalances.
         this.groupInstanceId = "Txn-consumer-" + instanceIdx;
diff --git 
a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java 
b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 4087367f899..ff44efe492e 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -26,7 +26,8 @@ public class KafkaConsumerProducerDemo {
     public static void main(String[] args) throws InterruptedException {
         boolean isAsync = args.length == 0 || 
!args[0].trim().equalsIgnoreCase("sync");
         CountDownLatch latch = new CountDownLatch(2);
-        Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync, 
null, false, 10000, -1, latch);
+        Producer producerThread = new Producer(
+            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false, 
10000, -1, latch);
         producerThread.start();
 
         Consumer consumerThread = new Consumer(
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java 
b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 03ee8e25498..af0cfce3dd3 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -91,7 +91,8 @@ public class KafkaExactlyOnceDemo {
         CountDownLatch prePopulateLatch = new CountDownLatch(1);
 
         /* Stage 2: pre-populate records */
-        Producer producerThread = new Producer(INPUT_TOPIC, false, null, true, 
numRecords, -1, prePopulateLatch);
+        Producer producerThread = new Producer(
+            "producer", KafkaProperties.KAFKA_SERVER_URL + ":" + 
KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords, 
-1, prePopulateLatch);
         producerThread.start();
 
         if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/Producer.java 
b/examples/src/main/java/kafka/examples/Producer.java
index e85fa16060e..36a2583954c 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -21,133 +21,165 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test" + key);
+                } else {
+                    syncSend(producer, key, "test" + key);
+                }
+                key++;
+                sentRecords++;
             }
-        } catch (Exception e) {
-            System.out.println("Producer encountered exception:" + e);
-        } finally {
-            System.out.println("Producer sent " + numRecords + " records 
successfully");
-            this.producer.close();
-            latch.countDown();
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Sent %d records", sentRecords);
+        shutdown();
     }
 
-    private void produceOnce(final int messageKey, final int recordsSent, 
final long currentTimeMs) throws ExecutionException, InterruptedException {
-        String messageStr = "Message_" + messageKey;
-
-        if (isAsync) { // Send asynchronously
-            sendAsync(messageKey, messageStr, currentTimeMs);
-            return;
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        Future<RecordMetadata> future = send(messageKey, messageStr);
-        future.get();
-        System.out.println("Sent message: (" + messageKey + ", " + messageStr 
+ ")");
     }
 
-    private void sendAsync(final int messageKey, final String messageStr, 
final long currentTimeMs) {
-        this.producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr),
-                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    public KafkaProducer<Integer, String> createKafkaProducer() {
+        Properties props = new Properties();
+        // bootstrap server config is required for producer to connect to 
brokers
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // client id is not required, but it's good to track the source of 
requests beyond just ip/port
+        // by allowing a logical application name to be included in 
server-side request logging
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + 
UUID.randomUUID());
+        // key and value are just byte arrays, so we need to set appropriate 
serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        if (transactionTimeoutMs > 0) {
+            // max time before the transaction coordinator proactively aborts 
the ongoing transaction
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
+        }
+        if (transactionalId != null) {
+            // the transactional id must be static and unique
+            // it is used to identify the same producer instance across 
process restarts
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        // enable duplicates protection at the partition level
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+        return new KafkaProducer<>(props);
     }
 
-    private Future<RecordMetadata> send(final int messageKey, final String 
messageStr) {
-        return producer.send(new ProducerRecord<>(topic,
-                messageKey,
-                messageStr));
+    private void asyncSend(KafkaProducer<Integer, String> producer, int key, 
String value) {
+        // send the record asynchronously, setting a callback to be notified 
of the result
+        // note that, even if you set a small batch.size with linger.ms=0, the 
send operation
+        // will still be blocked when buffer.memory is full or metadata are 
not available
+        producer.send(new ProducerRecord<>(topic, key, value), new 
ProducerCallback(key, value));
     }
-}
 
-class DemoCallBack implements Callback {
+    private RecordMetadata syncSend(KafkaProducer<Integer, String> producer, 
int key, String value)
+        throws ExecutionException, InterruptedException {
+        try {
+            // send the record and then call get, which blocks waiting for the 
ack from the broker
+            RecordMetadata metadata = producer.send(new 
ProducerRecord<>(topic, key, value)).get();
+            Utils.maybePrintRecord(numRecords, key, value, metadata);
+            return metadata;
+        } catch (AuthorizationException | UnsupportedVersionException | 
ProducerFencedException
+                 | FencedInstanceIdException | OutOfOrderSequenceException | 
SerializationException e) {
+            Utils.printErr(e.getMessage());
+            // we can't recover from these exceptions
+            shutdown();
+        } catch (KafkaException e) {
+            Utils.printErr(e.getMessage());
+        }
+        return null;
+    }
 
-    private final long startTime;
-    private final int key;
-    private final String message;
+    class ProducerCallback implements Callback {
+        private final int key;
+        private final String value;
 
-    public DemoCallBack(long startTime, int key, String message) {
-        this.startTime = startTime;
-        this.key = key;
-        this.message = message;
-    }
+        public ProducerCallback(int key, String value) {
+            this.key = key;
+            this.value = value;
+        }
 
-    /**
-     * A callback method the user can implement to provide asynchronous 
handling of request completion. This method will
-     * be called when the record sent to the server has been acknowledged. 
When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for 
topicPartition, which will be valid.
-     *
-     * @param metadata  The metadata for the record that was sent (i.e. the 
partition and offset). An empty metadata
-     *                  with -1 value for all fields except for topicPartition 
will be returned if an error occurred.
-     * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
-     */
-    public void onCompletion(RecordMetadata metadata, Exception exception) {
-        long elapsedTime = System.currentTimeMillis() - startTime;
-        if (metadata != null) {
-            System.out.println(
-                "message(" + key + ", " + message + ") sent to partition(" + 
metadata.partition() +
-                    "), " +
-                    "offset(" + metadata.offset() + ") in " + elapsedTime + " 
ms");
-        } else {
-            exception.printStackTrace();
+        /**
+         * A callback method the user can implement to provide asynchronous 
handling of request completion. This method will
+         * be called when the record sent to the server has been acknowledged. 
When exception is not null in the callback,
+         * metadata will contain the special -1 value for all fields except 
for topicPartition, which will be valid.
+         *
+         * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset). An empty metadata
+         *                 with -1 value for all fields except for 
topicPartition will be returned if an error occurred.
+         * @param exception The exception thrown during processing of this 
record. Null if no error occurred.
+         */
+        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
+            if (exception != null) {
+                Utils.printErr(exception.getMessage());
+                if (!(exception instanceof RetriableException)) {
+                    // we can't recover from these exceptions
+                    shutdown();
+                }
+            } else {
+                Utils.maybePrintRecord(numRecords, key, value, metadata);
+            }
         }
     }
 }

Reply via email to