[ https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16189947#comment-16189947 ]
Ted Yu commented on KAFKA-6000: ------------------------------- I wonder if the following had something to do with the problem: {code} 13:59:09.538 [StreamThread-2] DEBUG o.a.k.c.c.i.Fetcher - Ignoring fetched records for scratch.lateststate.dsh-2 at offset 1969072 since the current position is 2134335 {code} > streams 0.10.2.1 - kafka 0.11.0.1 state restore not working > ----------------------------------------------------------- > > Key: KAFKA-6000 > URL: https://issues.apache.org/jira/browse/KAFKA-6000 > Project: Kafka > Issue Type: Bug > Components: core, streams > Affects Versions: 0.10.2.1, 0.11.0.0 > Reporter: Bart Vercammen > Priority: Blocker > Attachments: correct-restore.log, failed-restore.log > > > Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1) > {noformat} > 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] > Registering state store lateststate to its state manager > 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] > Restoring state store lateststate from changelog topic > scratch.lateststate.dsh > 11:24:16.472 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Resetting offset > for partition scratch.lateststate.dsh-2 to latest offset. > 11:24:16.472 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Partition > scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata > refresh > 11:24:16.474 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Sending > ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, > partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.476 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received > ListOffsetResponse > {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]} > from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.476 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Handling > ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset > 1773763, timestamp -1 > 11:24:16.477 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Resetting offset > for partition scratch.lateststate.dsh-2 to earliest offset. > 11:24:16.478 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Sending > ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, > partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.480 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received > ListOffsetResponse > {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]} > from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.481 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Handling > ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, > timestamp -1 > 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring > partition scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 > 11:24:16.484 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch > request for partition scratch.lateststate.dsh-2 at offset 0 to node > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.485 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for > partitions [scratch.lateststate.dsh-2] to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.486 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for > partition scratch.lateststate.dsh-2 because there is an in-flight request to > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.490 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched > record for partition scratch.lateststate.dsh-2 with offset 0 to buffered > record list > 11:24:16.492 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 3 records > in fetch response for partition scratch.lateststate.dsh-2 with offset 0 > 11:24:16.493 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Returning fetched > records at offset 0 for assigned partition scratch.lateststate.dsh-2 and > update position to 1586527 > 11:24:16.494 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Ignoring fetched > records for scratch.lateststate.dsh-2 at offset 0 since the current position > is 1586527 > 11:24:16.496 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch > request for partition scratch.lateststate.dsh-2 at offset 1586527 to node > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.496 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for > partitions [scratch.lateststate.dsh-2] to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.498 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for > partition scratch.lateststate.dsh-2 because there is an in-flight request to > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.499 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched > record for partition scratch.lateststate.dsh-2 with offset 1586527 to > buffered record list > 11:24:16.500 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records > in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > 11:24:16.501 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch > request for partition scratch.lateststate.dsh-2 at offset 1586527 to node > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.502 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for > partitions [scratch.lateststate.dsh-2] to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.511 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for > partition scratch.lateststate.dsh-2 because there is an in-flight request to > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.512 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched > record for partition scratch.lateststate.dsh-2 with offset 1586527 to > buffered record list > 11:24:16.512 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records > in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > 11:24:16.513 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch > request for partition scratch.lateststate.dsh-2 at offset 1586527 to node > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.515 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for > partitions [scratch.lateststate.dsh-2] to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.517 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for > partition scratch.lateststate.dsh-2 because there is an in-flight request to > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.518 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched > record for partition scratch.lateststate.dsh-2 with offset 1586527 to > buffered record list > 11:24:16.519 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records > in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > 11:24:16.520 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Added fetch > request for partition scratch.lateststate.dsh-2 at offset 1586527 to node > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.520 [StreamThread-3] DEBUG o.a.k.c.c.i.Fetcher - Sending fetch for > partitions [scratch.lateststate.dsh-2] to broker > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.522 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Skipping fetch for > partition scratch.lateststate.dsh-2 because there is an in-flight request to > broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) > 11:24:16.523 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Adding fetched > record for partition scratch.lateststate.dsh-2 with offset 1586527 to > buffered record list > 11:24:16.523 [StreamThread-3] TRACE o.a.k.c.c.i.Fetcher - Received 0 records > in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 > {noformat} > In this setup, I have 5 Kafka brokers, running 0.11.0.1 (with SSL) and a > KafkaStreams application running version 0.10.2.1. The streams application > uses an underlying statestore (`scratch.lateststate.dsh`). The problem I've > seen is that when the kafka streams application (re)starts when quite some > data is already present in the state-stores, it does not restore the state. > KafkaStreams remains in {{REBALANCING}} state, and never exits the > {{restoreActiveState}} function in {{ProcessorStateManager}}. > Now, what I also noticed is that sometimes the state-restore seems to work > when the number of records in the changelog-topic is below 100K (or something > like that). I've seen a successful restore when the restore-consumer-lag was > below 100K records. > When running the exact same application on a 0.10.2.1 Kafka cluster the issue > never occures. It only happens when I run the 0.10.2.1 KafkaStreams > application against a 0.11 Kafka cluster. > The logs above are a snippet when restoring the changelog that 'hangs'. > It also shows FetchResponses returning 0 records all the time which look > awkward to me. > For what I can tell, in KafkaStreams, the code is stuck in this loop in > {{restoreActiveState}} because the offset does not increment anymore : > {code} > while (true) { > long offset = 0L; > for (ConsumerRecord<byte[], byte[]> record : > restoreConsumer.poll(100).records(storePartition)) { > offset = record.offset(); > if (offset >= limit) break; > stateRestoreCallback.restore(record.key(), > record.value()); > } > if (offset >= limit) { > break; > } else if (restoreConsumer.position(storePartition) == > endOffset) { > break; > } else if (restoreConsumer.position(storePartition) > > endOffset) { > // For a logging enabled changelog (no offset limit), > // the log end offset should not change while restoring > since it is only written by this thread. > throw new IllegalStateException(String.format("%s Log end > offset of %s should not change while restoring: old end offset %d, current > offset %d", > logPrefix, storePartition, endOffset, > restoreConsumer.position(storePartition))); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)