[ 
https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16189947#comment-16189947
 ] 

Ted Yu commented on KAFKA-6000:
-------------------------------

I wonder if the following had something to do with the problem:
{code}
13:59:09.538 [StreamThread-2] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched 
records for scratch.lateststate.dsh-2 at offset 1969072 since the current 
position is 2134335
{code}

> streams 0.10.2.1 - kafka 0.11.0.1 state restore not working
> -----------------------------------------------------------
>
>                 Key: KAFKA-6000
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6000
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, streams
>    Affects Versions: 0.10.2.1, 0.11.0.0
>            Reporter: Bart Vercammen
>            Priority: Blocker
>         Attachments: correct-restore.log, failed-restore.log
>
>
> Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1)
> {noformat}
> 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] 
> Registering state store lateststate to its state manager 
> 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] 
> Restoring state store lateststate from changelog topic 
> scratch.lateststate.dsh 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to latest offset. 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Partition 
> scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata 
> refresh 
> 11:24:16.474 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 
> 1773763, timestamp -1 
> 11:24:16.477 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to earliest offset. 
> 11:24:16.478 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.480 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.481 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, 
> timestamp -1 
> 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring 
> partition scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 
> 11:24:16.484 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 0 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.485 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.486 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.490 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 0 to buffered 
> record list 
> 11:24:16.492 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 3 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 0 
> 11:24:16.493 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Returning fetched 
> records at offset 0 for assigned partition scratch.lateststate.dsh-2 and 
> update position to 1586527 
> 11:24:16.494 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched 
> records for scratch.lateststate.dsh-2 at offset 0 since the current position 
> is 1586527 
> 11:24:16.496 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.496 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.498 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.499 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.500 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.501 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.502 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.511 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.513 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.515 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.517 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.518 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.519 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.520 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.520 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.522 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> {noformat}
> In this setup, I have 5 Kafka brokers, running 0.11.0.1 (with SSL) and a 
> KafkaStreams application running version 0.10.2.1.  The streams application 
> uses an underlying statestore (`scratch.lateststate.dsh`).  The problem I've 
> seen is that when the kafka streams application (re)starts when quite some 
> data is already present in the state-stores, it does not restore the state.  
> KafkaStreams remains in {{REBALANCING}} state, and never exits the 
> {{restoreActiveState}} function in {{ProcessorStateManager}}.
> Now, what I also noticed is that sometimes the state-restore seems to work 
> when the number of records in the changelog-topic is below 100K (or something 
> like that).  I've seen a successful restore when the restore-consumer-lag was 
> below 100K records.
> When running the exact same application on a 0.10.2.1 Kafka cluster the issue 
> never occures.  It only happens when I run the 0.10.2.1 KafkaStreams 
> application against a 0.11 Kafka cluster.
> The logs above are a snippet when restoring the changelog that 'hangs'.
> It also shows FetchResponses returning 0 records all the time which look 
> awkward to me.
> For what I can tell, in KafkaStreams, the code is stuck in this loop in 
> {{restoreActiveState}} because the offset does not increment anymore  : 
> {code}
>             while (true) {
>                 long offset = 0L;
>                 for (ConsumerRecord<byte[], byte[]> record : 
> restoreConsumer.poll(100).records(storePartition)) {
>                     offset = record.offset();
>                     if (offset >= limit) break;
>                     stateRestoreCallback.restore(record.key(), 
> record.value());
>                 }
>                 if (offset >= limit) {
>                     break;
>                 } else if (restoreConsumer.position(storePartition) == 
> endOffset) {
>                     break;
>                 } else if (restoreConsumer.position(storePartition) > 
> endOffset) {
>                     // For a logging enabled changelog (no offset limit),
>                     // the log end offset should not change while restoring 
> since it is only written by this thread.
>                     throw new IllegalStateException(String.format("%s Log end 
> offset of %s should not change while restoring: old end offset %d, current 
> offset %d",
>                             logPrefix, storePartition, endOffset, 
> restoreConsumer.position(storePartition)));
>                 }
>             }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to