Lae created KAFKA-5880:
--------------------------
Summary: Transactional producer and read committed consumer causes
consumer to stuck
Key: KAFKA-5880
URL: https://issues.apache.org/jira/browse/KAFKA-5880
Project: Kafka
Issue Type: Bug
Reporter: Lae
Attachments: index-updates-3.zip
We use transactional producers, and have configured isolation level on the
consumer to only read committed data. The consumer has somehow got into a stuck
state where it can no longer move forward because the Kafka server always
return empty list of records despite there are thousands more successful
transactions after the offset.
This is an example producer code:
{code:java}
Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,
UUID.randomUUID().toString());
config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
try (Producer<String, String> producer = new KafkaProducer<>(config)) {
producer.initTransactions();
try {
producer.beginTransaction();
// Multiple producer.send(...) here
producer.commitTransaction();
} catch (Throwable e) {
producer.abortTransaction();
}
}
{code}
This is the test consumer code:
{code:java}
Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,
IsolationLevel.READ_COMMITTED.toString().toLowerCase(ENGLISH));
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config)) {
consumer.subscribe(Collections.singleton("index-updates"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records) {
System.err.println(record.value());
}
consumer.commitSync();
}
}
{code}
I have also attached the problematic partition data index-updates-3.zip, to
reproduce the issue using the data, you can run a local Kafka instance, then
create a topic called "index-updates" with 3 partitions, and replace the
content of the index-updates-3 log directory with the attached content, then
running the above consumer code.
Then the consumer will be stuck at some point (not always at the same offset)
not making anymore progress even if you send new data into the partition (other
partitions seem fine). The following example is when the consumer was stuck at
offset 46644, and the Kafka server always return empty list of records when the
consumer fetches from 46644:
{noformat}
root@0b1e67f0c34b:/# /opt/kafka/bin/kafka-consumer-groups.sh --describe --group
my-group --bootstrap-server localhost:9092
Note: This will only show information about consumers that use the Java
consumer API (non-ZooKeeper-based consumers).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
CONSUMER-ID HOST
CLIENT-ID
index-updates 0 15281 15281 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 1 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 2 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 3 46644 65735 19091
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 4 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 5 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 6 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 7 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 8 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
index-updates 9 0 0 0
consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97
consumer-1
root@0b1e67f0c34b:/#
{noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)