fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378


##########
examples/src/main/java/kafka/examples/Producer.java:
##########
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test");
+                } else {
+                    syncSend(producer, key, "test");
+                }
+                key++;
+                sentRecords++;
             }
-        } catch (Exception e) {
-            System.out.println("Producer encountered exception:" + e);
-        } finally {
-            System.out.println("Producer sent " + numRecords + " records 
successfully");
-            this.producer.close();
-            latch.countDown();
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Sent %d records", sentRecords);
+        shutdown();
     }
 
-    private void produceOnce(final int messageKey, final int recordsSent, 
final long currentTimeMs) throws ExecutionException, InterruptedException {
-        String messageStr = "Message_" + messageKey;
-
-        if (isAsync) { // Send asynchronously
-            sendAsync(messageKey, messageStr, currentTimeMs);
-            return;
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        Future<RecordMetadata> future = send(messageKey, messageStr);
-        future.get();
-        System.out.println("Sent message: (" + messageKey + ", " + messageStr 
+ ")");
     }
 
-    private void sendAsync(final int messageKey, final String messageStr, 
final long currentTimeMs) {
-        this.producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr),
-                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    public KafkaProducer<Integer, String> createKafkaProducer() {
+        Properties props = new Properties();
+        // bootstrap server config is required for producer to connect to 
brokers
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // client id is not required, but it's good to track the source of 
requests beyond just ip/port
+        // by allowing a logical application name to be included in 
server-side request logging
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + 
UUID.randomUUID());
+        // key and value are just byte arrays, so we need to set appropriate 
serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        if (transactionTimeoutMs > 0) {
+            // max time before the transaction coordinator proactively aborts 
the ongoing transaction
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
+        }
+        if (transactionalId != null) {
+            // the transactional id must be static and unique
+            // it is used to identify the same producer instance across 
process restarts
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        // enable duplicates protection at the partition level
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+        return new KafkaProducer<>(props);
     }
 
-    private Future<RecordMetadata> send(final int messageKey, final String 
messageStr) {
-        return producer.send(new ProducerRecord<>(topic,
-                messageKey,
-                messageStr));
+    private void asyncSend(KafkaProducer<Integer, String> producer, int key, 
String value) {
+        // send the record asynchronously, setting a callback to be notified 
of the result
+        // note tha send blocks when buffer.memory is full or metadata are not 
available
+        producer.send(new ProducerRecord<>(topic, key, value), new 
ProducerCallback(key, value));
     }
-}
 
-class DemoCallBack implements Callback {
+    private RecordMetadata syncSend(KafkaProducer<Integer, String> producer, 
int key, String value)
+        throws ExecutionException, InterruptedException {
+        try {
+            // send the record and then call get, which blocks waiting for the 
ack from the broker
+            RecordMetadata metadata = producer.send(new 
ProducerRecord<>(topic, key, value)).get();
+            Utils.maybePrintRecord(numRecords, key, value, metadata);
+            return metadata;
+        } catch (AuthorizationException | UnsupportedVersionException | 
ProducerFencedException
+                 | FencedInstanceIdException | OutOfOrderSequenceException | 
SerializationException e) {
+            Utils.printErr(e.getMessage());
+            // we can't recover from these exceptions
+            shutdown();
+        } catch (KafkaException e) {
+            Utils.printErr(e.getMessage());
+        }
+        return null;
+    }
 
-    private final long startTime;
-    private final int key;
-    private final String message;
+    class ProducerCallback implements Callback {
+        private final int key;
+        private final String value;
 
-    public DemoCallBack(long startTime, int key, String message) {
-        this.startTime = startTime;
-        this.key = key;
-        this.message = message;
-    }
+        public ProducerCallback(int key, String value) {
+            this.key = key;
+            this.value = value;
+        }
 
-    /**
-     * A callback method the user can implement to provide asynchronous 
handling of request completion. This method will
-     * be called when the record sent to the server has been acknowledged. 
When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for 
topicPartition, which will be valid.
-     *
-     * @param metadata  The metadata for the record that was sent (i.e. the 
partition and offset). An empty metadata
-     *                  with -1 value for all fields except for topicPartition 
will be returned if an error occurred.
-     * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
-     */
-    public void onCompletion(RecordMetadata metadata, Exception exception) {
-        long elapsedTime = System.currentTimeMillis() - startTime;
-        if (metadata != null) {
-            System.out.println(
-                "message(" + key + ", " + message + ") sent to partition(" + 
metadata.partition() +
-                    "), " +
-                    "offset(" + metadata.offset() + ") in " + elapsedTime + " 
ms");
-        } else {
-            exception.printStackTrace();
+        public void onCompletion(RecordMetadata metadata, Exception e) {
+            if (e != null) {
+                Utils.printErr(e.getMessage());
+                if (e instanceof AuthorizationException
+                    || e instanceof UnsupportedVersionException
+                    || e instanceof ProducerFencedException
+                    || e instanceof FencedInstanceIdException
+                    || e instanceof OutOfOrderSequenceException
+                    || e instanceof SerializationException) {

Review Comment:
   Is it safe to assume non retriable if not an instance of RetriableException? 
In that case we could simply do the following:
   
   ```
   if (!(e instanceof RetriableException)) {
     // we can't recover from these exceptions
     shutdown();
   }
   ```
   
   We also need to fix the comment about OffsetMetadataTooLarge~~Exception~~ in 
Callback javadoc.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to