[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias J. Sax reassigned KAFKA-6782: -------------------------------------- Assignee: Lingxiao WANG > GlobalKTable GlobalStateStore never finishes restoring when consuming aborted > messages > -------------------------------------------------------------------------------------- > > Key: KAFKA-6782 > URL: https://issues.apache.org/jira/browse/KAFKA-6782 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0, 1.0.1 > Reporter: Lingxiao WANG > Assignee: Lingxiao WANG > Priority: Major > > Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his > solution which is below, works for the succeed transactional messages. But > when there are aborted messages, it will be in infinite loop. Here is his > proposition : > {code:java} > while (offset < highWatermark) { > final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); > for (ConsumerRecord<byte[], byte[]> record : records) { > if (record.key() != null) { > stateRestoreCallback.restore(record.key(), record.value()); > } > offset = consumer.position(topicPartition); > } > }{code} > Concretely, when the consumer consume a set of aborted messages, it polls 0 > records, and the code 'offset = consumer.position(topicPartition)' doesn't > have any opportunity to execute. > So I propose to move the code 'offset = consumer.position(topicPartition)' > outside of the cycle to guarantee that event if no records are polled, the > offset can always be updated. > {code:java} > while (offset < highWatermark) { > final ConsumerRecords<byte[], byte[]> records = consumer.poll(100); > for (ConsumerRecord<byte[], byte[]> record : records) { > if (record.key() != null) { > stateRestoreCallback.restore(record.key(), record.value()); > } > } > offset = consumer.position(topicPartition); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)