[ 
https://issues.apache.org/jira/browse/KAFKA-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16165252#comment-16165252
 ] 

Lae commented on KAFKA-5880:
----------------------------

This is the log from the consumer:

{noformat}
08:31:02.412 TRACE org.apache.kafka.clients.NetworkClient Completed receive 
from node 0, for key 1, received 
{throttle_time_ms=0,responses=[{topic=index-updates,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=6,last_stable_offset=6,log_start_offset=6,aborted_transactions=[]},record_set=[]},{partition_header={partition=9,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=7,error_code=0,high_watermark=2,last_stable_offset=2,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=8,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=5,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=6,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=3,error_code=0,high_watermark=65737,last_stable_offset=46644,log_start_offset=26888,aborted_transactions=[]},record_set=[]},{partition_header={partition=4,error_code=0,high_watermark=1,last_stable_offset=1,log_start_offset=0,aborted_transactions=[]},record_set=[]},{partition_header={partition=1,error_code=0,high_watermark=255,last_stable_offset=255,log_start_offset=254,aborted_transactions=[]},record_set=[]},{partition_header={partition=2,error_code=0,high_watermark=0,last_stable_offset=0,log_start_offset=0,aborted_transactions=[]},record_set=[]}]}]}
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 6 for partition index-updates-0 returned fetch data 
(error=NONE, highWaterMark=6, lastStableOffset = 6, logStartOffset = 6, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 0 for partition index-updates-9 returned fetch data 
(error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 2 for partition index-updates-7 returned fetch data 
(error=NONE, highWaterMark=2, lastStableOffset = 2, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 0 for partition index-updates-8 returned fetch data 
(error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 0 for partition index-updates-5 returned fetch data 
(error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 0 for partition index-updates-6 returned fetch data 
(error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 46644 for partition index-updates-3 returned fetch 
data (error=NONE, highWaterMark=65737, lastStableOffset = 46644, logStartOffset 
= 26888, abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 1 for partition index-updates-4 returned fetch data 
(error=NONE, highWaterMark=1, lastStableOffset = 1, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 255 for partition index-updates-1 returned fetch data 
(error=NONE, highWaterMark=255, lastStableOffset = 255, logStartOffset = 254, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Fetch 
READ_COMMITTED at offset 0 for partition index-updates-2 returned fetch data 
(error=NONE, highWaterMark=0, lastStableOffset = 0, logStartOffset = 0, 
abortedTransactions = [], recordsSizeInBytes=0)
08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-0 with offset 6
08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-0 to 6
08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-0 to 6
08:31:02.412 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 6 for assigned partition index-updates-0 
and update position to 6
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-9 with offset 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-9 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-9 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 0 for assigned partition index-updates-9 
and update position to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-7 with offset 2
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-7 to 2
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-7 to 2
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 2 for assigned partition index-updates-7 
and update position to 2
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-8 with offset 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-8 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-8 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 0 for assigned partition index-updates-8 
and update position to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-5 with offset 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-5 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-5 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 0 for assigned partition index-updates-5 
and update position to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-6 with offset 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-6 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-6 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 0 for assigned partition index-updates-6 
and update position to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-3 with offset 
46644
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-3 to 65737
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-3 to 46644
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 46644 for assigned partition 
index-updates-3 and update position to 46644
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-4 with offset 1
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-4 to 1
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-4 to 1
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 1 for assigned partition index-updates-4 
and update position to 1
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-1 with offset 255
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-1 to 255
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-1 to 255
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 255 for assigned partition index-updates-1 
and update position to 255
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Preparing to read 0 bytes of data for partition index-updates-2 with offset 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
high watermark for partition index-updates-2 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher Updating 
last stable offset for partition index-updates-2 to 0
08:31:02.413 TRACE org.apache.kafka.clients.consumer.internals.Fetcher 
Returning fetched records at offset 0 for assigned partition index-updates-2 
and update position to 0
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-0 at offset 6 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-9 at offset 0 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-7 at offset 2 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-8 at offset 0 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-5 at offset 0 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-6 at offset 0 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-3 at offset 46644 to 
node my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-4 at offset 1 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-1 at offset 255 to 
node my-server:9092 (id: 0 rack: null)
08:31:02.413 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Added 
READ_COMMITTED fetch request for partition index-updates-2 at offset 0 to node 
my-server:9092 (id: 0 rack: null)
08:31:02.414 DEBUG org.apache.kafka.clients.consumer.internals.Fetcher Sending 
READ_COMMITTED fetch for partitions [index-updates-0, index-updates-9, 
index-updates-7, index-updates-8, index-updates-5, index-updates-6, 
index-updates-3, index-updates-4, index-updates-1, index-updates-2] to broker 
my-server:9092 (id: 0 rack: null)
08:31:02.414 TRACE org.apache.kafka.clients.NetworkClient Sending FETCH 
{replica_id=-1,max_wait_time=5000,min_bytes=1,max_bytes=52428800,isolation_level=1,topics=[{topic=index-updates,partitions=[{partition=0,fetch_offset=6,log_start_offset=-1,max_bytes=1048576},{partition=9,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=7,fetch_offset=2,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=5,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=6,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=3,fetch_offset=46644,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=1,log_start_offset=-1,max_bytes=1048576},{partition=1,fetch_offset=255,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=0,log_start_offset=-1,max_bytes=1048576}]}]}
 to node 0.
{noformat}

> Transactional producer and read committed consumer causes consumer to stuck
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-5880
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5880
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Lae
>         Attachments: index-updates-3.zip
>
>
> We use transactional producers, and have configured isolation level on the 
> consumer to only read committed data. The consumer has somehow got into a 
> stuck state where it can no longer move forward because the Kafka server 
> always return empty list of records despite there are thousands more 
> successful transactions after the offset.
> This is an example producer code:
> {code:java}
> Properties config = new Properties();
> config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
> UUID.randomUUID().toString());
> config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
> config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class.getName());
> try (Producer<String, String> producer = new KafkaProducer<>(config)) {
>     producer.initTransactions();
>     try {
>         producer.beginTransaction();
>         // Multiple producer.send(...) here
>         producer.commitTransaction();
>     } catch (Throwable e) {
>         producer.abortTransaction();
>     }
> }
> {code}
> This is the test consumer code:
> {code:java}
> Properties config = new Properties();
> config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
> config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
> config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
> IsolationLevel.READ_COMMITTED.toString().toLowerCase(ENGLISH));
> config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
> config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
> StringDeserializer.class.getName());
> try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config)) {
>     consumer.subscribe(Collections.singleton("index-updates"));
>     while (true) {
>         ConsumerRecords<String, String> records = consumer.poll(5000);
>         for (ConsumerRecord<String, String> record : records) {
>             System.err.println(record.value());
>         }
>         consumer.commitSync();
>     }
> }
> {code}
> I have also attached the problematic partition data index-updates-3.zip, to 
> reproduce the issue using the data, you can run a local Kafka instance, then 
> create a topic called "index-updates" with 10 partitions, and replace the 
> content of the index-updates-3 log directory with the attached content, then 
> running the above consumer code.
> Then the consumer will be stuck at some point (not always at the same offset) 
> not making anymore progress even if you send new data into the partition 
> (other partitions seem fine). The following example is when the consumer was 
> stuck at offset 46644, and the Kafka server always return empty list of 
> records when the consumer fetches from 46644:
> {noformat}
> root@0b1e67f0c34b:/# /opt/kafka/bin/kafka-consumer-groups.sh --describe 
> --group my-group --bootstrap-server localhost:9092
> Note: This will only show information about consumers that use the Java 
> consumer API (non-ZooKeeper-based consumers).
> TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG 
>        CONSUMER-ID                                       HOST                 
>           CLIENT-ID
> index-updates                  0          15281           15281           0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  1          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  2          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  3          46644           65735           
> 19091      consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97     
>               consumer-1
> index-updates                  4          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  5          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  6          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  7          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  8          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> index-updates                  9          0               0               0   
>        consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f   /10.100.1.97         
>           consumer-1
> root@0b1e67f0c34b:/# 
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to