[ https://issues.apache.org/jira/browse/KAFKA-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165252#comment-16165252 ]
Lae commented on KAFKA-5880: ---------------------------- This is the log from the consumer: {noformat} 08:31:02.412 TRACE org.apache.kafka.clients.NetworkClient Completed receive from node 0, for key 1, received {throttle_time_ms=0,responses=[{topic=index-updates,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=6,last_stable_offset=6,log_start_offset=6,aborted_transactions=[]},record_set=[]},{partition_header={partition=9,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=7,error_code=0,high_watermark=2,last_stable_offset=2,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=8,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=5,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=6,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=3,error_code=0,high_watermark=65737,last_stable_offset=46644,log_start_offset=26888,aborted_transactions=[]},record_set=[]},{partition_header={partition=4,error_code=0,high_watermark=1,last_stable_offset=1,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=1,error_code=0,high_watermark=255,last_stable_offset=255,log_start_offset=254,aborted_transactions=[]},record_set=[]},{partition_header={partition=2,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]}]}]} 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 6 for partition index-updates-0 returned fetch data (error=NONE, highWaterMark=6, lastStableOffset = 6, logStartOffset = 6, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 0 for partition index-updates-9 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 2 for partition index-updates-7 returned fetch data (error=NONE, highWaterMark=2, lastStableOffset = 2, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 0 for partition index-updates-8 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 0 for partition index-updates-5 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 0 for partition index-updates-6 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 46644 for partition index-updates-3 returned fetch data (error=NONE, highWaterMark=65737, lastStableOffset = 46644, logStartOffset = 26888, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 1 for partition index-updates-4 returned fetch data (error=NONE, highWaterMark=1, lastStableOffset = 1, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 255 for partition index-updates-1 returned fetch data (error=NONE, highWaterMark=255, lastStableOffset = 255, logStartOffset = 254, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch READ_COMMITTED at offset 0 for partition index-updates-2 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, abortedTransactions = [], recordsSizeInBytes=0) 08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-0 with offset 6 08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-0 to 6 08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-0 to 6 08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 6 for assigned partition index-updates-0 and update position to 6 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-9 with offset 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-9 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-9 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 0 for assigned partition index-updates-9 and update position to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-7 with offset 2 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-7 to 2 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-7 to 2 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 2 for assigned partition index-updates-7 and update position to 2 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-8 with offset 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-8 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-8 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 0 for assigned partition index-updates-8 and update position to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-5 with offset 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-5 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-5 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 0 for assigned partition index-updates-5 and update position to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-6 with offset 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-6 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-6 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 0 for assigned partition index-updates-6 and update position to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-3 with offset 46644 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-3 to 65737 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-3 to 46644 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 46644 for assigned partition index-updates-3 and update position to 46644 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-4 with offset 1 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-4 to 1 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-4 to 1 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 1 for assigned partition index-updates-4 and update position to 1 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-1 with offset 255 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-1 to 255 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-1 to 255 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 255 for assigned partition index-updates-1 and update position to 255 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Preparing to read 0 bytes of data for partition index-updates-2 with offset 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating high watermark for partition index-updates-2 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating last stable offset for partition index-updates-2 to 0 08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Returning fetched records at offset 0 for assigned partition index-updates-2 and update position to 0 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-0 at offset 6 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-9 at offset 0 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-7 at offset 2 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-8 at offset 0 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-5 at offset 0 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-6 at offset 0 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-3 at offset 46644 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-4 at offset 1 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-1 at offset 255 to node my-server:9092 (id: 0 rack: null) 08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added READ_COMMITTED fetch request for partition index-updates-2 at offset 0 to node my-server:9092 (id: 0 rack: null) 08:31:02.414 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Sending READ_COMMITTED fetch for partitions [index-updates-0, index-updates-9, index-updates-7, index-updates-8, index-updates-5, index-updates-6, index-updates-3, index-updates-4, index-updates-1, index-updates-2] to broker my-server:9092 (id: 0 rack: null) 08:31:02.414 TRACE org.apache.kafka.clients.NetworkClient Sending FETCH {replica_id=-1,max_wait_time=5000,min_bytes=1,max_bytes=52428800,isolation_level=1,topics=[{topic=index-updates,partitions=[{partition=0,fetch_offset=6,log_start_offset=-1,max_bytes=1048576},{partition=9,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=7,fetch_offset=2,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=5,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=6,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=3,fetch_offset=46644,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=1,log_start_offset=-1,max_bytes=1048576},{partition=1,fetch_offset=255,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=0,log_start_offset=-1,max_bytes=1048576}]}]} to node 0. {noformat} > Transactional producer and read committed consumer causes consumer to stuck > --------------------------------------------------------------------------- > > Key: KAFKA-5880 > URL: https://issues.apache.org/jira/browse/KAFKA-5880 > Project: Kafka > Issue Type: Bug > Reporter: Lae > Attachments: index-updates-3.zip > > > We use transactional producers, and have configured isolation level on the > consumer to only read committed data. The consumer has somehow got into a > stuck state where it can no longer move forward because the Kafka server > always return empty list of records despite there are thousands more > successful transactions after the offset. > This is an example producer code: > {code:java} > Properties config = new Properties(); > config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, > UUID.randomUUID().toString()); > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > try (Producer<String, String> producer = new KafkaProducer<>(config)) { > producer.initTransactions(); > try { > producer.beginTransaction(); > // Multiple producer.send(...) here > producer.commitTransaction(); > } catch (Throwable e) { > producer.abortTransaction(); > } > } > {code} > This is the test consumer code: > {code:java} > Properties config = new Properties(); > config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); > config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, > IsolationLevel.READ_COMMITTED.toString().toLowerCase(ENGLISH)); > config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config)) { > consumer.subscribe(Collections.singleton("index-updates")); > while (true) { > ConsumerRecords<String, String> records = consumer.poll(5000); > for (ConsumerRecord<String, String> record : records) { > System.err.println(record.value()); > } > consumer.commitSync(); > } > } > {code} > I have also attached the problematic partition data index-updates-3.zip, to > reproduce the issue using the data, you can run a local Kafka instance, then > create a topic called "index-updates" with 10 partitions, and replace the > content of the index-updates-3 log directory with the attached content, then > running the above consumer code. > Then the consumer will be stuck at some point (not always at the same offset) > not making anymore progress even if you send new data into the partition > (other partitions seem fine). The following example is when the consumer was > stuck at offset 46644, and the Kafka server always return empty list of > records when the consumer fetches from 46644: > {noformat} > root@0b1e67f0c34b:/# /opt/kafka/bin/kafka-consumer-groups.sh --describe > --group my-group --bootstrap-server localhost:9092 > Note: This will only show information about consumers that use the Java > consumer API (non-ZooKeeper-based consumers). > TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > CONSUMER-ID HOST > CLIENT-ID > index-updates 0 15281 15281 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 1 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 2 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 3 46644 65735 > 19091 consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 4 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 5 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 6 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 7 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 8 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 9 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > root@0b1e67f0c34b:/# > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)