[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185727238 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1185104750 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,164 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test" + key); +} else { +syncSend(producer, key, "test" + key); +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); +} +key++; +sentRecords++;
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960378 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); +} +key++; +sentRecords++;
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1183960307 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); +} +key++; +sentRecords++;
[GitHub] [kafka] fvaleri commented on a diff in pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1183959056 ## examples/src/main/java/kafka/examples/Producer.java: ## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { -private final KafkaProducer producer; +private final String bootstrapServers; private final String topic; -private final Boolean isAsync; -private int numRecords; +private final boolean isAsync; +private final String transactionalId; +private final boolean enableIdempotency; +private final int numRecords; +private final int transactionTimeoutMs; private final CountDownLatch latch; +private volatile boolean closed; -public Producer(final String topic, -final Boolean isAsync, -final String transactionalId, -final boolean enableIdempotency, -final int numRecords, -final int transactionTimeoutMs, -final CountDownLatch latch) { -Properties props = new Properties(); -props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); -props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); -props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); -props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); -if (transactionTimeoutMs > 0) { -props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); -} -if (transactionalId != null) { -props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); -} -props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); -producer = new KafkaProducer<>(props); - +public Producer(String threadName, +String bootstrapServers, +String topic, +boolean isAsync, +String transactionalId, +boolean enableIdempotency, +int numRecords, +int transactionTimeoutMs, +CountDownLatch latch) { +super(threadName); +this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; +this.transactionalId = transactionalId; +this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; +this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } -KafkaProducer get() { -return producer; -} - @Override public void run() { -int messageKey = 0; -int recordsSent = 0; -try { -while (recordsSent < numRecords) { -final long currentTimeMs = System.currentTimeMillis(); -produceOnce(messageKey, recordsSent, currentTimeMs); -messageKey += 2; -recordsSent += 1; +int key = 0; +int sentRecords = 0; +// the producer instance is thread safe +try (KafkaProducer producer = createKafkaProducer()) { +while (!closed && sentRecords < numRecords) { +if (isAsync) { +asyncSend(producer, key, "test"); +} else { +syncSend(producer, key, "test"); Review Comment: Sure. -- This is an automated message from the Apache Git