showuon commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185665409
########## examples/src/main/java/kafka/examples/Producer.java: ########## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { - private final KafkaProducer<Integer, String> producer; + private final String bootstrapServers; private final String topic; - private final Boolean isAsync; - private int numRecords; + private final boolean isAsync; + private final String transactionalId; + private final boolean enableIdempotency; + private final int numRecords; + private final int transactionTimeoutMs; private final CountDownLatch latch; + private volatile boolean closed; - public Producer(final String topic, - final Boolean isAsync, - final String transactionalId, - final boolean enableIdempotency, - final int numRecords, - final int transactionTimeoutMs, - final CountDownLatch latch) { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - if (transactionTimeoutMs > 0) { - props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); - } - if (transactionalId != null) { - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - } - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); - producer = new KafkaProducer<>(props); - + public Producer(String threadName, + String bootstrapServers, + String topic, + boolean isAsync, + String transactionalId, + boolean enableIdempotency, + int numRecords, + int transactionTimeoutMs, + CountDownLatch latch) { + super(threadName); + this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; + this.transactionalId = transactionalId; + this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; + this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } - KafkaProducer<Integer, String> get() { - return producer; - } - @Override public void run() { - int messageKey = 0; - int recordsSent = 0; - try { - while (recordsSent < numRecords) { - final long currentTimeMs = System.currentTimeMillis(); - produceOnce(messageKey, recordsSent, currentTimeMs); - messageKey += 2; - recordsSent += 1; + int key = 0; + int sentRecords = 0; + // the producer instance is thread safe + try (KafkaProducer<Integer, String> producer = createKafkaProducer()) { + while (!closed && sentRecords < numRecords) { + if (isAsync) { + asyncSend(producer, key, "test" + key); + } else { + syncSend(producer, key, "test" + key); + } + key++; + sentRecords++; } - } catch (Exception e) { - System.out.println("Producer encountered exception:" + e); - } finally { - System.out.println("Producer sent " + numRecords + " records successfully"); - this.producer.close(); - latch.countDown(); + } catch (Throwable e) { + Utils.printOut("Unhandled exception"); + e.printStackTrace(); } + Utils.printOut("Sent %d records", sentRecords); + shutdown(); } - private void produceOnce(final int messageKey, final int recordsSent, final long currentTimeMs) throws ExecutionException, InterruptedException { - String messageStr = "Message_" + messageKey; - - if (isAsync) { // Send asynchronously - sendAsync(messageKey, messageStr, currentTimeMs); - return; + public void shutdown() { + if (!closed) { + closed = true; + latch.countDown(); } - Future<RecordMetadata> future = send(messageKey, messageStr); - future.get(); - System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")"); } - private void sendAsync(final int messageKey, final String messageStr, final long currentTimeMs) { - this.producer.send(new ProducerRecord<>(topic, - messageKey, - messageStr), - new DemoCallBack(currentTimeMs, messageKey, messageStr)); + public KafkaProducer<Integer, String> createKafkaProducer() { + Properties props = new Properties(); + // bootstrap server config is required for producer to connect to brokers + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + // client id is not required, but it's good to track the source of requests beyond just ip/port + // by allowing a logical application name to be included in server-side request logging + props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); + // key and value are just byte arrays, so we need to set appropriate serializers + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + if (transactionTimeoutMs > 0) { + // max time before the transaction coordinator proactively aborts the ongoing transaction + props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); + } + if (transactionalId != null) { + // the transactional id must be static and unique + // it is used to identify the same producer instance across process restarts + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); + } + // enable duplicates protection at the partition level + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); + return new KafkaProducer<>(props); } - private Future<RecordMetadata> send(final int messageKey, final String messageStr) { - return producer.send(new ProducerRecord<>(topic, - messageKey, - messageStr)); + private void asyncSend(KafkaProducer<Integer, String> producer, int key, String value) { + // send the record asynchronously, setting a callback to be notified of the result + // note tha send blocks when buffer.memory is full or metadata are not available Review Comment: OK, I know what you mean. But `batch.size` and `linger.ms` can also block the send, why do we only mention `buffer.memory` and metadata unavailable here? Let's say, we set a large number of `batch.size` and `linger.ms`, that means we won't send any data until one batch is full or linger.ms expired, is that right? I'm thinking we don't need to write them all here. We can link them to the producer javadoc like this: ``` note that send will be blocked when buffer.memory is full or metadata not available or other reasons. See {@link KafkaProducer} for more info. ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org