[ 
https://issues.apache.org/jira/browse/KAFKA-6000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188651#comment-16188651
 ] 

Bart Vercammen commented on KAFKA-6000:
---------------------------------------

Ok, I'll try to collect all client- and server-logs from the platform and post 
them here.

Some partitions are indeed successfully restored.  I have not seen a direct 
pattern here yet, but at first glance it looks like the partitions that contain 
less than 100,000 records or so successfully recover, and the partitions with 
(a lot) more records fail.  But as said, this is what I notice at first glance, 
but still need to investigate/test further to be sure this is the pattern.

I'm also trying to reproduce this in a fully controlled unit-test, but 
currently this is still work in progress ...
Once I catch this issue in a unit-test, I'll also post it here as a reference.


> streams 0.10.2.1 - kafka 0.11.0.1 state restore not working
> -----------------------------------------------------------
>
>                 Key: KAFKA-6000
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6000
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, streams
>    Affects Versions: 0.10.2.1, 0.11.0.0
>            Reporter: Bart Vercammen
>            Priority: Blocker
>
> Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1)
> {noformat}
> 11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] 
> Registering state store lateststate to its state manager 
> 11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] 
> Restoring state store lateststate from changelog topic 
> scratch.lateststate.dsh 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to latest offset. 
> 11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Partition 
> scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata 
> refresh 
> 11:24:16.474 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.476 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 
> 1773763, timestamp -1 
> 11:24:16.477 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset 
> for partition scratch.lateststate.dsh-2 to earliest offset. 
> 11:24:16.478 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
> ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
> partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.480 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
> ListOffsetResponse 
> {responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]}
>  from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.481 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
> ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, 
> timestamp -1 
> 11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring 
> partition scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 
> 11:24:16.484 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 0 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.485 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.486 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.490 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 0 to buffered 
> record list 
> 11:24:16.492 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 3 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 0 
> 11:24:16.493 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Returning fetched 
> records at offset 0 for assigned partition scratch.lateststate.dsh-2 and 
> update position to 1586527 
> 11:24:16.494 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched 
> records for scratch.lateststate.dsh-2 at offset 0 since the current position 
> is 1586527 
> 11:24:16.496 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.496 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.498 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.499 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.500 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.501 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.502 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.511 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.513 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.515 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.517 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.518 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.519 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> 11:24:16.520 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch 
> request for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.520 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
> partitions [scratch.lateststate.dsh-2] to broker 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.522 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
> partition scratch.lateststate.dsh-2 because there is an in-flight request to 
> broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
> 11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
> record for partition scratch.lateststate.dsh-2 with offset 1586527 to 
> buffered record list 
> 11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
> in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
> {noformat}
> In this setup, I have 5 Kafka brokers, running 0.11.0.1 (with SSL) and a 
> KafkaStreams application running version 0.10.2.1.  The streams application 
> uses an underlying statestore (`scratch.lateststate.dsh`).  The problem I've 
> seen is that when the kafka streams application (re)starts when quite some 
> data is already present in the state-stores, it does not restore the state.  
> KafkaStreams remains in {{REBALANCING}} state, and never exits the 
> {{restoreActiveState}} function in {{ProcessorStateManager}}.
> Now, what I also noticed is that sometimes the state-restore seems to work 
> when the number of records in the changelog-topic is below 100K (or something 
> like that).  I've seen a successful restore when the restore-consumer-lag was 
> below 100K records.
> When running the exact same application on a 0.10.2.1 Kafka cluster the issue 
> never occures.  It only happens when I run the 0.10.2.1 KafkaStreams 
> application against a 0.11 Kafka cluster.
> The logs above are a snippet when restoring the changelog that 'hangs'.
> It also shows FetchResponses returning 0 records all the time which look 
> awkward to me.
> For what I can tell, in KafkaStreams, the code is stuck in this loop in 
> {{restoreActiveState}} because the offset does not increment anymore  : 
> {code}
>             while (true) {
>                 long offset = 0L;
>                 for (ConsumerRecord<byte[], byte[]> record : 
> restoreConsumer.poll(100).records(storePartition)) {
>                     offset = record.offset();
>                     if (offset >= limit) break;
>                     stateRestoreCallback.restore(record.key(), 
> record.value());
>                 }
>                 if (offset >= limit) {
>                     break;
>                 } else if (restoreConsumer.position(storePartition) == 
> endOffset) {
>                     break;
>                 } else if (restoreConsumer.position(storePartition) > 
> endOffset) {
>                     // For a logging enabled changelog (no offset limit),
>                     // the log end offset should not change while restoring 
> since it is only written by this thread.
>                     throw new IllegalStateException(String.format("%s Log end 
> offset of %s should not change while restoring: old end offset %d, current 
> offset %d",
>                             logPrefix, storePartition, endOffset, 
> restoreConsumer.position(storePartition)));
>                 }
>             }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to