fvaleri commented on code in PR #13514: URL: https://github.com/apache/kafka/pull/13514#discussion_r1174433855
########## examples/src/main/java/kafka/examples/Consumer.java: ########## @@ -21,97 +21,120 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.RecordDeserializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collection; -import java.util.Collections; import java.util.Optional; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; +import static java.util.Collections.singleton; + /** - * A simple consumer thread that demonstrate subscribe and poll use case. The thread subscribes to a topic, - * then runs a loop to poll new messages, and print the message out. The thread closes until the target {@code - * numMessageToConsume} is hit or catching an exception. + * A simple consumer thread that subscribes to a topic, fetches new records and prints them. + * The thread does not stop until all records are completed or an exception is raised. */ public class Consumer extends Thread implements ConsumerRebalanceListener { - private final KafkaConsumer<Integer, String> consumer; + private final String bootstrapServers; private final String topic; private final String groupId; - private final int numMessageToConsume; - private int messageRemaining; + private final Optional<String> instanceId; + private final boolean readCommitted; + private final int numRecords; private final CountDownLatch latch; + private volatile boolean closed; + private int remainingRecords; - public Consumer(final String topic, - final String groupId, - final Optional<String> instanceId, - final boolean readCommitted, - final int numMessageToConsume, - final CountDownLatch latch) { - super("KafkaConsumerExample"); - this.groupId = groupId; - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); - instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id)); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - if (readCommitted) { - props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); - } - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - consumer = new KafkaConsumer<>(props); + public Consumer(String threadName, + String bootstrapServers, + String topic, + String groupId, + Optional<String> instanceId, + boolean readCommitted, + int numRecords, + CountDownLatch latch) { + super(threadName); + this.bootstrapServers = bootstrapServers; this.topic = topic; - this.numMessageToConsume = numMessageToConsume; - this.messageRemaining = numMessageToConsume; + this.groupId = groupId; + this.instanceId = instanceId; + this.readCommitted = readCommitted; + this.numRecords = numRecords; + this.remainingRecords = numRecords; this.latch = latch; } - KafkaConsumer<Integer, String> get() { - return consumer; - } - @Override public void run() { - try { - System.out.println("Subscribe to:" + this.topic); - consumer.subscribe(Collections.singletonList(this.topic), this); - do { - doWork(); - } while (messageRemaining > 0); - System.out.println(groupId + " finished reading " + numMessageToConsume + " messages"); - } catch (WakeupException e) { - // swallow the wakeup - } catch (Exception e) { - System.out.println("Unexpected termination, exception thrown:" + e); - } finally { - shutdown(); + // the consumer instance is NOT thread safe + try (KafkaConsumer<Integer, String> consumer = createKafkaConsumer()) { + consumer.subscribe(singleton(topic), this); + Utils.printOut("Subscribed to %s", topic); + while (!closed && remainingRecords > 0) { + try { + // next poll must be called within session.timeout.ms to avoid rebalance + ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); + for (ConsumerRecord<Integer, String> record : records) { + Utils.maybePrintRecord(numRecords, record); + } + remainingRecords -= records.count(); + } catch (AuthorizationException | UnsupportedVersionException + | RecordDeserializationException e) { + // we can't recover from these exceptions + Utils.printErr(e.getMessage()); + shutdown(); + } catch (KafkaException e) { + // log the exception and try to continue + // you can add your application retry strategy here + Utils.printErr(e.getMessage()); + } + } + } catch (Throwable e) { + Utils.printOut("Unhandled exception"); + e.printStackTrace(); } + Utils.printOut("Fetched %d records", numRecords - remainingRecords); + shutdown(); } - public void doWork() { - ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1)); - for (ConsumerRecord<Integer, String> record : records) { - System.out.println(groupId + " received message : from partition " + record.partition() + ", (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); + + public void shutdown() { + if (!closed) { + closed = true; + latch.countDown(); } - messageRemaining -= records.count(); } - public void shutdown() { - this.consumer.close(); - latch.countDown(); + public KafkaConsumer<Integer, String> createKafkaConsumer() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, "client-" + UUID.randomUUID()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + instanceId.ifPresent(id -> props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, id)); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, readCommitted ? "false" : "true"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + if (readCommitted) { + props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); + } + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return new KafkaConsumer<>(props); } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { - System.out.println("Revoking partitions:" + partitions); + Utils.printOut("Revoked partitions: %s", partitions); } Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org