This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 78090bb4cdd KAFKA-14752: Kafka examples improvements - producer
changes (#13515)
78090bb4cdd is described below
commit 78090bb4cdd2494f0b720d34e17ee0cc645fc399
Author: Federico Valeri <[email protected]>
AuthorDate: Mon May 8 04:15:52 2023 +0200
KAFKA-14752: Kafka examples improvements - producer changes (#13515)
KAFKA-14752: Kafka examples improvements - producer changes
Reviewers: Luke Chen <[email protected]>, Christo Lolov
<[email protected]>
---
.../examples/ExactlyOnceMessageProcessor.java | 4 +-
.../kafka/examples/KafkaConsumerProducerDemo.java | 3 +-
.../java/kafka/examples/KafkaExactlyOnceDemo.java | 3 +-
.../src/main/java/kafka/examples/Producer.java | 216 ++++++++++++---------
4 files changed, 131 insertions(+), 95 deletions(-)
diff --git
a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
index 6de6ab99d26..3fa1e4ba041 100644
--- a/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
+++ b/examples/src/main/java/kafka/examples/ExactlyOnceMessageProcessor.java
@@ -66,7 +66,9 @@ public class ExactlyOnceMessageProcessor extends Thread {
// It is recommended to have a relatively short txn timeout in order
to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
// A unique transactional.id must be provided in order to properly use
EOS.
- producer = new Producer(outputTopic, true, transactionalId, true, -1,
transactionTimeoutMs, null).get();
+ producer = new Producer(
+ "processor-producer", KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT, outputTopic, true, transactionalId, true,
-1, transactionTimeoutMs, null)
+ .createKafkaProducer();
// Consumer must be in read_committed mode, which means it won't be
able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid
unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;
diff --git
a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
index 4087367f899..ff44efe492e 100644
--- a/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaConsumerProducerDemo.java
@@ -26,7 +26,8 @@ public class KafkaConsumerProducerDemo {
public static void main(String[] args) throws InterruptedException {
boolean isAsync = args.length == 0 ||
!args[0].trim().equalsIgnoreCase("sync");
CountDownLatch latch = new CountDownLatch(2);
- Producer producerThread = new Producer(KafkaProperties.TOPIC, isAsync,
null, false, 10000, -1, latch);
+ Producer producerThread = new Producer(
+ "producer", KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT, KafkaProperties.TOPIC, isAsync, null, false,
10000, -1, latch);
producerThread.start();
Consumer consumerThread = new Consumer(
diff --git a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
index 03ee8e25498..af0cfce3dd3 100644
--- a/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
+++ b/examples/src/main/java/kafka/examples/KafkaExactlyOnceDemo.java
@@ -91,7 +91,8 @@ public class KafkaExactlyOnceDemo {
CountDownLatch prePopulateLatch = new CountDownLatch(1);
/* Stage 2: pre-populate records */
- Producer producerThread = new Producer(INPUT_TOPIC, false, null, true,
numRecords, -1, prePopulateLatch);
+ Producer producerThread = new Producer(
+ "producer", KafkaProperties.KAFKA_SERVER_URL + ":" +
KafkaProperties.KAFKA_SERVER_PORT, INPUT_TOPIC, false, null, true, numRecords,
-1, prePopulateLatch);
producerThread.start();
if (!prePopulateLatch.await(5, TimeUnit.MINUTES)) {
diff --git a/examples/src/main/java/kafka/examples/Producer.java
b/examples/src/main/java/kafka/examples/Producer.java
index e85fa16060e..36a2583954c 100644
--- a/examples/src/main/java/kafka/examples/Producer.java
+++ b/examples/src/main/java/kafka/examples/Producer.java
@@ -21,133 +21,165 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
/**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
*/
public class Producer extends Thread {
- private final KafkaProducer<Integer, String> producer;
+ private final String bootstrapServers;
private final String topic;
- private final Boolean isAsync;
- private int numRecords;
+ private final boolean isAsync;
+ private final String transactionalId;
+ private final boolean enableIdempotency;
+ private final int numRecords;
+ private final int transactionTimeoutMs;
private final CountDownLatch latch;
+ private volatile boolean closed;
- public Producer(final String topic,
- final Boolean isAsync,
- final String transactionalId,
- final boolean enableIdempotency,
- final int numRecords,
- final int transactionTimeoutMs,
- final CountDownLatch latch) {
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
IntegerSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
- if (transactionTimeoutMs > 0) {
- props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
transactionTimeoutMs);
- }
- if (transactionalId != null) {
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
- }
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
- producer = new KafkaProducer<>(props);
-
+ public Producer(String threadName,
+ String bootstrapServers,
+ String topic,
+ boolean isAsync,
+ String transactionalId,
+ boolean enableIdempotency,
+ int numRecords,
+ int transactionTimeoutMs,
+ CountDownLatch latch) {
+ super(threadName);
+ this.bootstrapServers = bootstrapServers;
this.topic = topic;
this.isAsync = isAsync;
+ this.transactionalId = transactionalId;
+ this.enableIdempotency = enableIdempotency;
this.numRecords = numRecords;
+ this.transactionTimeoutMs = transactionTimeoutMs;
this.latch = latch;
}
- KafkaProducer<Integer, String> get() {
- return producer;
- }
-
@Override
public void run() {
- int messageKey = 0;
- int recordsSent = 0;
- try {
- while (recordsSent < numRecords) {
- final long currentTimeMs = System.currentTimeMillis();
- produceOnce(messageKey, recordsSent, currentTimeMs);
- messageKey += 2;
- recordsSent += 1;
+ int key = 0;
+ int sentRecords = 0;
+ // the producer instance is thread safe
+ try (KafkaProducer<Integer, String> producer = createKafkaProducer()) {
+ while (!closed && sentRecords < numRecords) {
+ if (isAsync) {
+ asyncSend(producer, key, "test" + key);
+ } else {
+ syncSend(producer, key, "test" + key);
+ }
+ key++;
+ sentRecords++;
}
- } catch (Exception e) {
- System.out.println("Producer encountered exception:" + e);
- } finally {
- System.out.println("Producer sent " + numRecords + " records
successfully");
- this.producer.close();
- latch.countDown();
+ } catch (Throwable e) {
+ Utils.printOut("Unhandled exception");
+ e.printStackTrace();
}
+ Utils.printOut("Sent %d records", sentRecords);
+ shutdown();
}
- private void produceOnce(final int messageKey, final int recordsSent,
final long currentTimeMs) throws ExecutionException, InterruptedException {
- String messageStr = "Message_" + messageKey;
-
- if (isAsync) { // Send asynchronously
- sendAsync(messageKey, messageStr, currentTimeMs);
- return;
+ public void shutdown() {
+ if (!closed) {
+ closed = true;
+ latch.countDown();
}
- Future<RecordMetadata> future = send(messageKey, messageStr);
- future.get();
- System.out.println("Sent message: (" + messageKey + ", " + messageStr
+ ")");
}
- private void sendAsync(final int messageKey, final String messageStr,
final long currentTimeMs) {
- this.producer.send(new ProducerRecord<>(topic,
- messageKey,
- messageStr),
- new DemoCallBack(currentTimeMs, messageKey, messageStr));
+ public KafkaProducer<Integer, String> createKafkaProducer() {
+ Properties props = new Properties();
+ // bootstrap server config is required for producer to connect to
brokers
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ // client id is not required, but it's good to track the source of
requests beyond just ip/port
+ // by allowing a logical application name to be included in
server-side request logging
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-" +
UUID.randomUUID());
+ // key and value are just byte arrays, so we need to set appropriate
serializers
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
IntegerSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
+ if (transactionTimeoutMs > 0) {
+ // max time before the transaction coordinator proactively aborts
the ongoing transaction
+ props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
transactionTimeoutMs);
+ }
+ if (transactionalId != null) {
+ // the transactional id must be static and unique
+ // it is used to identify the same producer instance across
process restarts
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
+ }
+ // enable duplicates protection at the partition level
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
+ return new KafkaProducer<>(props);
}
- private Future<RecordMetadata> send(final int messageKey, final String
messageStr) {
- return producer.send(new ProducerRecord<>(topic,
- messageKey,
- messageStr));
+ private void asyncSend(KafkaProducer<Integer, String> producer, int key,
String value) {
+ // send the record asynchronously, setting a callback to be notified
of the result
+ // note that, even if you set a small batch.size with linger.ms=0, the
send operation
+ // will still be blocked when buffer.memory is full or metadata are
not available
+ producer.send(new ProducerRecord<>(topic, key, value), new
ProducerCallback(key, value));
}
-}
-class DemoCallBack implements Callback {
+ private RecordMetadata syncSend(KafkaProducer<Integer, String> producer,
int key, String value)
+ throws ExecutionException, InterruptedException {
+ try {
+ // send the record and then call get, which blocks waiting for the
ack from the broker
+ RecordMetadata metadata = producer.send(new
ProducerRecord<>(topic, key, value)).get();
+ Utils.maybePrintRecord(numRecords, key, value, metadata);
+ return metadata;
+ } catch (AuthorizationException | UnsupportedVersionException |
ProducerFencedException
+ | FencedInstanceIdException | OutOfOrderSequenceException |
SerializationException e) {
+ Utils.printErr(e.getMessage());
+ // we can't recover from these exceptions
+ shutdown();
+ } catch (KafkaException e) {
+ Utils.printErr(e.getMessage());
+ }
+ return null;
+ }
- private final long startTime;
- private final int key;
- private final String message;
+ class ProducerCallback implements Callback {
+ private final int key;
+ private final String value;
- public DemoCallBack(long startTime, int key, String message) {
- this.startTime = startTime;
- this.key = key;
- this.message = message;
- }
+ public ProducerCallback(int key, String value) {
+ this.key = key;
+ this.value = value;
+ }
- /**
- * A callback method the user can implement to provide asynchronous
handling of request completion. This method will
- * be called when the record sent to the server has been acknowledged.
When exception is not null in the callback,
- * metadata will contain the special -1 value for all fields except for
topicPartition, which will be valid.
- *
- * @param metadata The metadata for the record that was sent (i.e. the
partition and offset). An empty metadata
- * with -1 value for all fields except for topicPartition
will be returned if an error occurred.
- * @param exception The exception thrown during processing of this record.
Null if no error occurred.
- */
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- long elapsedTime = System.currentTimeMillis() - startTime;
- if (metadata != null) {
- System.out.println(
- "message(" + key + ", " + message + ") sent to partition(" +
metadata.partition() +
- "), " +
- "offset(" + metadata.offset() + ") in " + elapsedTime + "
ms");
- } else {
- exception.printStackTrace();
+ /**
+ * A callback method the user can implement to provide asynchronous
handling of request completion. This method will
+ * be called when the record sent to the server has been acknowledged.
When exception is not null in the callback,
+ * metadata will contain the special -1 value for all fields except
for topicPartition, which will be valid.
+ *
+ * @param metadata The metadata for the record that was sent (i.e. the
partition and offset). An empty metadata
+ * with -1 value for all fields except for
topicPartition will be returned if an error occurred.
+ * @param exception The exception thrown during processing of this
record. Null if no error occurred.
+ */
+ public void onCompletion(RecordMetadata metadata, Exception exception)
{
+ if (exception != null) {
+ Utils.printErr(exception.getMessage());
+ if (!(exception instanceof RetriableException)) {
+ // we can't recover from these exceptions
+ shutdown();
+ }
+ } else {
+ Utils.maybePrintRecord(numRecords, key, value, metadata);
+ }
}
}
}