[ https://issues.apache.org/jira/browse/KAFKA-16194?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kirk True updated KAFKA-16194: ------------------------------ Labels: consumer-threading-refactor kip-848-client-support (was: consumer-threading-refactor) > KafkaConsumer.groupMetadata() should be correct when first records are > returned > ------------------------------------------------------------------------------- > > Key: KAFKA-16194 > URL: https://issues.apache.org/jira/browse/KAFKA-16194 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer > Reporter: David Jacot > Assignee: Bruno Cadonna > Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > The following code returns records before the group metadata is updated. This > fails the first transactions ever run by the Producer/Consumer. > > {code:java} > Producer<String, String> txnProducer = new KafkaProducer<>(txnProducerProps); > Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps); > txnProducer.initTransactions(); > System.out.println("Init transactions called"); > try { > txnProducer.beginTransaction(); > System.out.println("Begin transactions called"); > consumer.subscribe(Collections.singletonList("input")); > System.out.println("Consumer subscribed to topic -> KIP848-topic-2 "); > ConsumerRecords<String, String> records = > consumer.poll(Duration.ofSeconds(10)); > System.out.println("Returned " + records.count() + " records."); > // Process and send txn messages. > for (ConsumerRecord<String, String> processedRecord : records) { > txnProducer.send(new ProducerRecord<>("output", > processedRecord.key(), "Processed: " + processedRecord.value())); > } > ConsumerGroupMetadata groupMetadata = consumer.groupMetadata(); > System.out.println("Group metadata inside test" + groupMetadata); > Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); > for (ConsumerRecord<String, String> record : records) { > offsetsToCommit.put(new TopicPartition(record.topic(), > record.partition()), > new OffsetAndMetadata(record.offset() + 1)); > } > System.out.println("Offsets to commit" + offsetsToCommit); > // Send offsets to transaction with ConsumerGroupMetadata. > txnProducer.sendOffsetsToTransaction(offsetsToCommit, groupMetadata); > System.out.println("Send offsets to transaction done"); > // Commit the transaction. > txnProducer.commitTransaction(); > System.out.println("Commit transaction done"); > } catch (ProducerFencedException | OutOfOrderSequenceException | > AuthorizationException e) { > e.printStackTrace(); > txnProducer.close(); > } catch (KafkaException e) { > e.printStackTrace(); > txnProducer.abortTransaction(); > } finally { > txnProducer.close(); > consumer.close(); > } {code} > The issue seems to be that while it waits in `poll`, the event to update the > group metadata is not processed. -- This message was sent by Atlassian Jira (v8.20.10#820010)