[ 
https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16194:
------------------------------
    Labels: consumer-threading-refactor  (was: )

> KafkaConsumer.groupMetadata() should be correct when first records are 
> returned
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-16194
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16194
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: clients, consumer
>            Reporter: David Jacot
>            Assignee: Bruno Cadonna
>            Priority: Major
>              Labels: consumer-threading-refactor
>
> The following code returns records before the group metadata is updated. This 
> fails the first transactions ever run by the Producer/Consumer.
>  
> {code:java}
> Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps);
> Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
> txnProducer.initTransactions();
> System.out.println("Init transactions called");
> try {
>     txnProducer.beginTransaction();
>     System.out.println("Begin transactions called");
>     consumer.subscribe(Collections.singletonList("input"));
>     System.out.println("Consumer subscribed to topic -> KIP848-topic-2 ");
>     ConsumerRecords<String, String> records = 
> consumer.poll(Duration.ofSeconds(10));
>     System.out.println("Returned " + records.count() + " records.");
>     // Process and send txn messages.
>     for (ConsumerRecord<String, String> processedRecord : records) {
>         txnProducer.send(new ProducerRecord<>("output", 
> processedRecord.key(), "Processed: " + processedRecord.value()));
>     }
>     ConsumerGroupMetadata groupMetadata = consumer.groupMetadata();
>     System.out.println("Group metadata inside test" + groupMetadata);
>     Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
>     for (ConsumerRecord<String, String> record : records) {
>         offsetsToCommit.put(new TopicPartition(record.topic(), 
> record.partition()),
>             new OffsetAndMetadata(record.offset() + 1));
>     }
>     System.out.println("Offsets to commit" + offsetsToCommit);
>     // Send offsets to transaction with ConsumerGroupMetadata.
>     txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata);
>     System.out.println("Send offsets to transaction done");
>     // Commit the transaction.
>     txnProducer.commitTransaction();
>     System.out.println("Commit transaction done");
> } catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
>     e.printStackTrace();
>     txnProducer.close();
> } catch (KafkaException e) {
>     e.printStackTrace();
>     txnProducer.abortTransaction();
> } finally {
>     txnProducer.close();
>     consumer.close();
> } {code}
> The issue seems to be that while it waits in `poll`, the event to update the 
> group metadata is not processed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to