[ https://issues.apache.org/jira/browse/KAFKA-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16164149#comment-16164149 ]
Apurva Mehta commented on KAFKA-5880: ------------------------------------- Hi Lae, Thanks for reporting the issue. Can you share the TRACE level logging of the consumer when it is stuck? Thanks, Apurva > Transactional producer and read committed consumer causes consumer to stuck > --------------------------------------------------------------------------- > > Key: KAFKA-5880 > URL: https://issues.apache.org/jira/browse/KAFKA-5880 > Project: Kafka > Issue Type: Bug > Reporter: Lae > Attachments: index-updates-3.zip > > > We use transactional producers, and have configured isolation level on the > consumer to only read committed data. The consumer has somehow got into a > stuck state where it can no longer move forward because the Kafka server > always return empty list of records despite there are thousands more > successful transactions after the offset. > This is an example producer code: > {code:java} > Properties config = new Properties(); > config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, > UUID.randomUUID().toString()); > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > try (Producer<String, String> producer = new KafkaProducer<>(config)) { > producer.initTransactions(); > try { > producer.beginTransaction(); > // Multiple producer.send(...) here > producer.commitTransaction(); > } catch (Throwable e) { > producer.abortTransaction(); > } > } > {code} > This is the test consumer code: > {code:java} > Properties config = new Properties(); > config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); > config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, > IsolationLevel.READ_COMMITTED.toString().toLowerCase(ENGLISH)); > config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config)) { > consumer.subscribe(Collections.singleton("index-updates")); > while (true) { > ConsumerRecords<String, String> records = consumer.poll(5000); > for (ConsumerRecord<String, String> record : records) { > System.err.println(record.value()); > } > consumer.commitSync(); > } > } > {code} > I have also attached the problematic partition data index-updates-3.zip, to > reproduce the issue using the data, you can run a local Kafka instance, then > create a topic called "index-updates" with 3 partitions, and replace the > content of the index-updates-3 log directory with the attached content, then > running the above consumer code. > Then the consumer will be stuck at some point (not always at the same offset) > not making anymore progress even if you send new data into the partition > (other partitions seem fine). The following example is when the consumer was > stuck at offset 46644, and the Kafka server always return empty list of > records when the consumer fetches from 46644: > {noformat} > root@0b1e67f0c34b:/# /opt/kafka/bin/kafka-consumer-groups.sh --describe > --group my-group --bootstrap-server localhost:9092 > Note: This will only show information about consumers that use the Java > consumer API (non-ZooKeeper-based consumers). > TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > CONSUMER-ID HOST > CLIENT-ID > index-updates 0 15281 15281 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 1 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 2 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 3 46644 65735 > 19091 consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 4 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 5 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 6 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 7 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 8 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 9 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > root@0b1e67f0c34b:/# > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)