[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1185104750


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,164 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test" + key);
+} else {
+syncSend(producer, key, "test" + key);
+ 

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-04 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-03 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-03 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960307


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");
+}
+key++;
+sentRecords++;
  

[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes

2023-05-03 Thread via GitHub


fvaleri commented on code in PR #13515:
URL: https://github.com/apache/kafka/pull/13515#discussion_r1183959056


##
examples/src/main/java/kafka/examples/Producer.java:
##
@@ -21,133 +21,159 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.AuthorizationException;
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.util.Properties;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
- * Demo producer that demonstrate two modes of KafkaProducer.
- * If the user uses the Async mode: The messages will be printed to stdout 
upon successful completion
- * If the user uses the sync mode (isAsync = false): Each send loop will block 
until completion.
+ * A simple producer thread supporting two send modes:
+ * - Async mode (default): records are sent without waiting for the response.
+ * - Sync mode: each send operation blocks waiting for the response.
  */
 public class Producer extends Thread {
-private final KafkaProducer producer;
+private final String bootstrapServers;
 private final String topic;
-private final Boolean isAsync;
-private int numRecords;
+private final boolean isAsync;
+private final String transactionalId;
+private final boolean enableIdempotency;
+private final int numRecords;
+private final int transactionTimeoutMs;
 private final CountDownLatch latch;
+private volatile boolean closed;
 
-public Producer(final String topic,
-final Boolean isAsync,
-final String transactionalId,
-final boolean enableIdempotency,
-final int numRecords,
-final int transactionTimeoutMs,
-final CountDownLatch latch) {
-Properties props = new Properties();
-props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
-props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
-props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class.getName());
-props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-if (transactionTimeoutMs > 0) {
-props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 
transactionTimeoutMs);
-}
-if (transactionalId != null) {
-props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
-}
-props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
-producer = new KafkaProducer<>(props);
-
+public Producer(String threadName,
+String bootstrapServers,
+String topic,
+boolean isAsync,
+String transactionalId,
+boolean enableIdempotency,
+int numRecords,
+int transactionTimeoutMs,
+CountDownLatch latch) {
+super(threadName);
+this.bootstrapServers = bootstrapServers;
 this.topic = topic;
 this.isAsync = isAsync;
+this.transactionalId = transactionalId;
+this.enableIdempotency = enableIdempotency;
 this.numRecords = numRecords;
+this.transactionTimeoutMs = transactionTimeoutMs;
 this.latch = latch;
 }
 
-KafkaProducer get() {
-return producer;
-}
-
 @Override
 public void run() {
-int messageKey = 0;
-int recordsSent = 0;
-try {
-while (recordsSent < numRecords) {
-final long currentTimeMs = System.currentTimeMillis();
-produceOnce(messageKey, recordsSent, currentTimeMs);
-messageKey += 2;
-recordsSent += 1;
+int key = 0;
+int sentRecords = 0;
+// the producer instance is thread safe
+try (KafkaProducer producer = createKafkaProducer()) {
+while (!closed && sentRecords < numRecords) {
+if (isAsync) {
+asyncSend(producer, key, "test");
+} else {
+syncSend(producer, key, "test");

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git