showuon commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185665409


##########
examples/src/main/java/kafka/examples/Producer.java:
##########
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-    private final KafkaProducer<Integer, String> producer;
+    private final String bootstrapServers;
     private final String topic;
-    private final Boolean isAsync;
-    private int numRecords;
+    private final boolean isAsync;
+    private final String transactionalId;
+    private final boolean enableIdempotency;
+    private final int numRecords;
+    private final int transactionTimeoutMs;
     private final CountDownLatch latch;
+    private volatile boolean closed;
 
-    public Producer(final String topic,
-                    final Boolean isAsync,
-                    final String transactionalId,
-                    final boolean enableIdempotency,
-                    final int numRecords,
-                    final int transactionTimeoutMs,
-                    final CountDownLatch latch) {
-        Properties props = new Properties();
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-        props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        if (transactionTimeoutMs > 0) {
-            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-        }
-        if (transactionalId != null) {
-            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-        }
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-        producer = new KafkaProducer<>(props);
-
+    public Producer(String threadName,
+                    String bootstrapServers,
+                    String topic,
+                    boolean isAsync,
+                    String transactionalId,
+                    boolean enableIdempotency,
+                    int numRecords,
+                    int transactionTimeoutMs,
+                    CountDownLatch latch) {
+        super(threadName);
+        this.bootstrapServers = bootstrapServers;
         this.topic = topic;
         this.isAsync = isAsync;
+        this.transactionalId = transactionalId;
+        this.enableIdempotency = enableIdempotency;
         this.numRecords = numRecords;
+        this.transactionTimeoutMs = transactionTimeoutMs;
         this.latch = latch;
     }
 
-    KafkaProducer<Integer, String> get() {
-        return producer;
-    }
-
     @Override
     public void run() {
-        int messageKey = 0;
-        int recordsSent = 0;
-        try {
-            while (recordsSent < numRecords) {
-                final long currentTimeMs = System.currentTimeMillis();
-                produceOnce(messageKey, recordsSent, currentTimeMs);
-                messageKey += 2;
-                recordsSent += 1;
+        int key = 0;
+        int sentRecords = 0;
+        // the producer instance is thread safe
+        try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+            while (!closed && sentRecords < numRecords) {
+                if (isAsync) {
+                    asyncSend(producer, key, "test" + key);
+                } else {
+                    syncSend(producer, key, "test" + key);
+                }
+                key++;
+                sentRecords++;
             }
-        } catch (Exception e) {
-            System.out.println("Producer encountered exception:" + e);
-        } finally {
-            System.out.println("Producer sent " + numRecords + " records 
successfully");
-            this.producer.close();
-            latch.countDown();
+        } catch (Throwable e) {
+            Utils.printOut("Unhandled exception");
+            e.printStackTrace();
         }
+        Utils.printOut("Sent %d records", sentRecords);
+        shutdown();
     }
 
-    private void produceOnce(final int messageKey, final int recordsSent, 
final long currentTimeMs) throws ExecutionException, InterruptedException {
-        String messageStr = "Message_" + messageKey;
-
-        if (isAsync) { // Send asynchronously
-            sendAsync(messageKey, messageStr, currentTimeMs);
-            return;
+    public void shutdown() {
+        if (!closed) {
+            closed = true;
+            latch.countDown();
         }
-        Future<RecordMetadata> future = send(messageKey, messageStr);
-        future.get();
-        System.out.println("Sent message: (" + messageKey + ", " + messageStr 
+ ")");
     }
 
-    private void sendAsync(final int messageKey, final String messageStr, 
final long currentTimeMs) {
-        this.producer.send(new ProducerRecord<>(topic,
-                        messageKey,
-                        messageStr),
-                new DemoCallBack(currentTimeMs, messageKey, messageStr));
+    public KafkaProducer<Integer, String> createKafkaProducer() {
+        Properties props = new Properties();
+        // bootstrap server config is required for producer to connect to 
brokers
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // client id is not required, but it's good to track the source of 
requests beyond just ip/port
+        // by allowing a logical application name to be included in 
server-side request logging
+        props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + 
UUID.randomUUID());
+        // key and value are just byte arrays, so we need to set appropriate 
serializers
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+        if (transactionTimeoutMs > 0) {
+            // max time before the transaction coordinator proactively aborts 
the ongoing transaction
+            props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
+        }
+        if (transactionalId != null) {
+            // the transactional id must be static and unique
+            // it is used to identify the same producer instance across 
process restarts
+            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+        }
+        // enable duplicates protection at the partition level
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+        return new KafkaProducer<>(props);
     }
 
-    private Future<RecordMetadata> send(final int messageKey, final String 
messageStr) {
-        return producer.send(new ProducerRecord<>(topic,
-                messageKey,
-                messageStr));
+    private void asyncSend(KafkaProducer<Integer, String> producer, int key, 
String value) {
+        // send the record asynchronously, setting a callback to be notified 
of the result
+        // note tha send blocks when buffer.memory is full or metadata are not 
available

Review Comment:
   OK, I know what you mean. But `batch.size` and `linger.ms` can also block 
the send, why do we only mention `buffer.memory` and metadata unavailable here? 
Let's say, we set a large number of `batch.size` and `linger.ms`, that means we 
won't send any data until one batch is full or linger.ms expired, is that 
right? 
   
   I'm thinking we don't need to write them all here. We can link them to the 
producer javadoc like this:
   ```
   note that send will be blocked when buffer.memory is full or metadata not 
available or other reasons. See {@link KafkaProducer} for more info.
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to