Lingxiao WANG created KAFKA-6782:
------------------------------------
Summary: GlobalStateStore never finishes restoring when consuming
transactional messages
Key: KAFKA-6782
URL: https://issues.apache.org/jira/browse/KAFKA-6782
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 1.0.1, 1.1.0
Reporter: Lingxiao WANG
Some problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his
proposition :
{code:java}
while (offset < highWatermark) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
stateRestoreCallback.restore(record.key(), record.value());
}
offset = consumer.position(topicPartition);
}
}{code}
doesn't work for me. In my situation, there are chance to have several
transaction markers appear in sequence in one partition. In this case, the
consumer is blocked and can't poll any records, and the code 'offset =
consumer.position(topicPartition)' doesn't have any opportunity to execute.
So I propose to move the code 'offset = consumer.position(topicPartition)'
outside of the cycle to guarantee that event if no records are polled, the
offset can always be updated.
{code:java}
while (offset < highWatermark) {
final ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
if (record.key() != null) {
stateRestoreCallback.restore(record.key(), record.value());
}
}
offset = consumer.position(topicPartition);
}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)