[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17094313#comment-17094313
 ] 

Mateusz Jadczyk commented on KAFKA-9891:
----------------------------------------

Thanks for looking into it. I looked into the test (see comments in the PR), 
played a bit with it and enabled some more logging and I may have more insights.
First of all, are you sure your test uses 2.4 clients? We used 2.4.1 clients 
and this broker image confluentinc/cp-zookeeper:5.3.1. I see that you use 
StoreQueryParameters in the code which is not available in Streams 2.4.1 and 
also ProcessorStateManager implementation changed a lot.

I also revised the logs I included in the ticket and may have a new finding. 
The flow is:
 * NODE 1 T-2 has active task 1_2.
 * NODE 3 *T-1* has standy task 1_2.
 * NODE 1 T-2 crashes
 * NODE 3 *T-2* takes over, T-1 (which had a standby task) is assigned other 
task, standby task 1_2 is revoked.
 * NODE 2 T1 has standby task 1_2
 * NODE 3 T-2 crashes
 * NODE 3 T-1 takes over
 * NODE2 T-1 standby task 1_2 is revoked.

 

The crucial takeaway here is that if we focus on strictly NODE 3, we can see 
that the task 1_2 was not taken over by a thread T-1 with standby task, but 
rather T-2. I guess that's how this version of TaskAssignor works. Digging 
deeper I checked what exactly happened when standby task was revoked on T-1, 
and active task was starting on T-2.
So this is T-1 having standby task revoked:
{noformat}
NODE_3 2020-04-15 21:11:47.024  INFO 1 --- [-StreamThread-1] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
State transition from RUNNING to PARTITIONS_ASSIGNED
NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.AssignedStandbyTasks         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
Closing revoked standby tasks {1_2=[mnl.xxxx.command-2, xxxx.command-2]}
NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StandbyTask  : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing
NODE_3 2020-04-15 21:11:47.027 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StandbyTask  : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Committing
NODE_3 2020-04-15 21:11:47.027 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Flushing all stores registered in the state manager
NODE_3 2020-04-15 21:11:47.032 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Flushing store COMMAND_ID_STORE
NODE_3 2020-04-15 21:11:47.194 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Flushing store XXXX_STATE_STORE
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Checkpointable offsets updated with restored offsets: 
{XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Checkpointable offsets updated with active acked offsets: 
{XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Writing checkpoint: 
{XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:47.296 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.processor.internals.StandbyTask  : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing state manager
NODE_3 2020-04-15 21:11:47.296 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing its state manager and all the registered state stores
NODE_3 2020-04-15 21:11:47.298 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing storage engine COMMAND_ID_STORE
NODE_3 2020-04-15 21:11:47.388 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Closing storage engine XXXX_STATE_STORE
NODE_3 2020-04-15 21:11:47.455 DEBUG 1 --- [-StreamThread-1] 
o.a.k.s.p.internals.StateDirectory       : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
Released state dir lock for task 1_2{noformat}
And this is T-2 with active task starting:
{noformat}
NODE_3 2020-04-15 21:11:46.556 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.TaskManager  : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] New 
active tasks to be created: {1_2=[mnl.xxxx.command-2, xxxx.command-2]}
NODE_3 2020-04-15 21:11:46.697 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Checkpointable offsets read from checkpoint: 
{XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2=1}
NODE_3 2020-04-15 21:11:46.703 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Created state store manager for task 1_2
NODE_3 2020-04-15 21:11:46.950  INFO 1 --- [-StreamThread-2] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Creating producer client for task 1_2
        client.id = 
XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer
NODE_3 2020-04-15 21:11:47.137  INFO 1 --- [-StreamThread-2] 
o.a.k.clients.producer.KafkaProducer     : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Instantiated a transactional 
producer.
NODE_3 2020-04-15 21:11:47.389  INFO 1 --- [-StreamThread-2] 
o.a.k.clients.producer.KafkaProducer     : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Overriding the default retries 
config to the recommended value of 2147483647 since the idempotent producer is 
enabled.
NODE_3 2020-04-15 21:11:47.403  INFO 1 --- [-StreamThread-2] 
o.a.k.clients.producer.KafkaProducer     : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Overriding the default acks to all 
since idempotence is enabled.
NODE_3 2020-04-15 21:11:47.472 DEBUG 1 --- [-2-1_2-producer] 
o.a.k.clients.producer.internals.Sender  : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Starting Kafka producer I/O thread.
NODE_3 2020-04-15 21:11:47.599  INFO 1 --- [-2-1_2-producer] 
org.apache.kafka.clients.Metadata        : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Cluster ID: UoTd5Q9HQsKwSUpY3eABQA
NODE_3 2020-04-15 21:11:47.670 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Register global stores []
NODE_3 2020-04-15 21:11:47.742 DEBUG 1 --- [-StreamThread-2] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Transition from state UNINITIALIZED 
to INITIALIZING
NODE_3 2020-04-15 21:11:47.742  INFO 1 --- [-StreamThread-2] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] ProducerId set to -1 with epoch -1
NODE_3 2020-04-15 21:11:47.742 DEBUG 1 --- [-StreamThread-2] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Enqueuing transactional request 
InitProducerIdRequestData(transactionalId='XXXXCommandProcessor-1_2', 
transactionTimeoutMs=60000)
NODE_3 2020-04-15 21:11:47.744 DEBUG 1 --- [-2-1_2-producer] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Enqueuing transactional request 
FindCoordinatorRequestData(key='XXXXCommandProcessor-1_2', keyType=1)
NODE_3 2020-04-15 21:11:47.790 DEBUG 1 --- [-2-1_2-producer] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Enqueuing transactional request 
InitProducerIdRequestData(transactionalId='XXXXCommandProcessor-1_2', 
transactionTimeoutMs=60000)
NODE_3 2020-04-15 21:11:47.855 DEBUG 1 --- [-2-1_2-producer] 
o.a.k.clients.producer.internals.Sender  : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Sending transactional request 
FindCoordinatorRequestData(key='XXXXCommandProcessor-1_2', keyType=1) to node 
kafka-3:39092 (id: 3 rack: rack-a)
NODE_3 2020-04-15 21:11:48.000 DEBUG 1 --- [-2-1_2-producer] 
o.a.k.clients.producer.internals.Sender  : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Sending transactional request 
InitProducerIdRequestData(transactionalId='XXXXCommandProcessor-1_2', 
transactionTimeoutMs=60000) to node kafka-1:19092 (id: 1 rack: null)
NODE_3 2020-04-15 21:11:48.133  INFO 1 --- [-2-1_2-producer] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] ProducerId set to 1014 with epoch 3
NODE_3 2020-04-15 21:11:48.146 DEBUG 1 --- [-2-1_2-producer] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Transition from state INITIALIZING 
to READY
NODE_3 2020-04-15 21:11:48.148 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Created task 1_2 with assigned partitions [mnl.xxxx.command-2, xxxx.command-2]
NODE_3 2020-04-15 21:11:48.150  INFO 1 --- [-StreamThread-2] 
o.a.k.clients.consumer.KafkaConsumer     : [Consumer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions
NODE_3 2020-04-15 21:11:48.150 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.TaskManager  : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Pausing all active task partitions until the underlying state stores are ready
NODE_3 2020-04-15 21:11:48.152 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.TaskManager  : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Pausing partitions: [zzzzzz_state-1, mnl.xxxx.command-2, xxxx.command-2]
NODE_3 2020-04-15 21:11:48.152 DEBUG 1 --- [-StreamThread-2] 
o.a.k.clients.consumer.KafkaConsumer     : [Consumer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-consumer,
 groupId=XXXXCommandProcessor] Pausing partitions [zzzzzz_state-1, 
mnl.xxxx.command-2, xxxx.command-2]
NODE_3 2020-04-15 21:11:48.153  INFO 1 --- [-StreamThread-2] 
o.a.k.s.p.internals.StreamThread         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
partition assignment took 1599 ms.
NODE_3 2020-04-15 21:11:48.192 DEBUG 1 --- [-StreamThread-2] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-consumer,
 groupId=XXXXCommandProcessor] Fetching committed offsets for partitions: 
[mnl.xxxx.command-2, xxxx.command-2]
NODE_3 2020-04-15 21:11:48.316 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Initializing stream tasks [1_2]
NODE_3 2020-04-15 21:11:48.326  INFO 1 --- [-StreamThread-2] 
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-consumer,
 groupId=XXXXCommandProcessor] Found no committed offset for partition 
mnl.xxxx.command-2
NODE_3 2020-04-15 21:11:48.365 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Updating store offset limit with {mnl.xxxx.command-2=0, xxxx.command-2=0}
NODE_3 2020-04-15 21:11:48.371 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] A committed timestamp was detected: setting the partition time of 
partition xxxx.command-2 to 1586985068274 in stream task 1_2
NODE_3 2020-04-15 21:11:48.371 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] No committed offset for partition mnl.xxxx.command-2, therefore no 
timestamp can be found for this partition
NODE_3 2020-04-15 21:11:48.393 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Initializing state stores
NODE_3 2020-04-15 21:11:48.410 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.internals.StateDirectory       : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Acquired state dir lock for task 1_2
NODE_3 2020-04-15 21:11:48.410 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Initializing state stores
NODE_3 2020-04-15 21:11:48.410 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Initializing store COMMAND_ID_STORE
NODE_3 2020-04-15 21:11:49.075 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Registering state store COMMAND_ID_STORE to its state manager
NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
XXXXCommandProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:11:49.076 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.StoreChangelogReader         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Added restorer for changelog XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2
NODE_3 2020-04-15 21:11:49.076 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Initializing store XXXX_STATE_STORE
NODE_3 2020-04-15 21:11:49.349 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Registering state store XXXX_STATE_STORE to its state manager
NODE_3 2020-04-15 21:11:49.349 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager        : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Restoring state store XXXX_STATE_STORE from changelog topic 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog at checkpoint 1
NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.StoreChangelogReader         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Found checkpoint 1 from changelog 
XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2 for store 
COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:50.082 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Stream task 1_2 cannot resume processing yet since some of its changelog 
partitions have not completed restoring: 
[XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2]NODE_3 2020-04-15 
21:11:50.627 DEBUG 1 --- [-StreamThread-2] o.a.k.s.p.i.AssignedStreamsTasks     
    : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Transitioning stream task 1_2 to running
NODE_3 2020-04-15 21:11:50.627 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.processor.internals.StreamTask   : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Initializing processor nodes of the topology
NODE_3 2020-04-15 21:11:51.389 DEBUG 1 --- [-StreamThread-2] 
o.a.k.c.p.internals.TransactionManager   : [Producer 
clientId=XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2-1_2-producer,
 transactionalId=XXXXCommandProcessor-1_2] Transition from state READY to 
IN_TRANSACTION
NODE_3 2020-04-15 21:11:51.399 DEBUG 1 --- [-StreamThread-2] 
o.a.k.s.p.i.AssignedStreamsTasks         : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] 
Stream task 1_2 completed restoration as all its changelog partitions 
[XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2] have been applied to restore 
state
{noformat}

Now compare timestamps:
{noformat}
NODE_3 2020-04-15 21:11:46.697 TRACE 1 --- [-StreamThread-2] 
o.a.k.s.p.i.ProcessorStateManager : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
[1_2] Checkpointable offsets read from checkpoint: 
{XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2=1}

NODE_3 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
o.a.k.s.p.i.ProcessorStateManager : stream-thread 
[XXXXCommandProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
standby-task [1_2] Writing checkpoint: 
{XXXXCommandProcessor-COMMAND_ID_STORE-changelog-2=1, 
XXXXCommandProcessor-XXXX_STATE_STORE-changelog-2=1}{noformat}
It seems that the standby task on T-1 hasn't finished shutdown yet, and active 
task on T-2 is already starting. If you look at the code corresponding to these 
2 particular logs:
{noformat}
log.trace("Checkpointable offsets read from checkpoint: {}", 
initialLoadedCheckpoints);

if (eosEnabled) {
    // with EOS enabled, there should never be a checkpoint file _during_ 
processing.
    // delete the checkpoint file after loading its stored offsets.
    checkpointFile.delete();
    checkpointFile = null;
}{noformat}
 
{noformat}
log.trace("Writing checkpoint: {}", checkpointFileCache);
try {
    checkpointFile.write(checkpointFileCache);
} catch (final IOException e) {
    log.warn("Failed to write offset checkpoint file to [{}]", checkpointFile, 
e);
}{noformat}

Can it be a race condition, that a new active task reads the file, saves 
content in memory, then deletes the checkpoint file. 0.5 sec after that standby 
task when shutting down writes the checkpoint file, and eventually it's there 
although it shouldn't.
Is this checkpoint file locked in any way? It's also a bit misleading that 
there are no logs when this files is deleted, so it's hard to debug.

I am aware it may be hard to reproduce in a test due to timing, but maybe with 
exactly the same topology and this strange rebalancing scenario it could 
happen. Or some kind of unit test?
I also noticed that the piece of code operating on state store and checkpoint 
files has been recently rewritten, but it's hard to tell if this scenario can 
still happen.

Let me know if I can help any other way. I attached detailed logs showing the 
afore-mentioned failover scenario.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-9891
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9891
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.1, 2.4.1
>            Reporter: Mateusz Jadczyk
>            Assignee: Boyang Chen
>            Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpointNODE_1 log1:2020-04-15 
> 21:11:33.942 DEBUG 1 --- [-StreamThread-2] 
> c.g.f.c.s.validation.CommandIdValidator : CommandId: 
> mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.NODE_3 
> 2020-04-15 21:11:47.195 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:47.233 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/XXXXProcessor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:49.075 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:11:49.436 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.StoreChangelogReader : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Found 
> checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for 
> store COMMAND_ID_STORE.NODE_3 2020-04-15 21:11:52.023 DEBUG 1 --- 
> [-StreamThread-2] c.g.f.c.s.validation.CommandIdValidator : CommandId: 
> mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc is not a duplicate.
> NODE_3 2020-04-15 21:11:53.683 ERROR 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.AssignedStreamsTasks : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] Failed to 
> process stream task 1_2 due to the following error: 
> java.lang.RuntimeExceptionNODE_3 2020-04-15 21:12:05.346 TRACE 1 --- 
> [-StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> XXXXProcessor-COMMAND_ID_STORE-changelog at checkpoint 1
> NODE_3 2020-04-15 21:12:05.562 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.StoreChangelogReader : stream-thread 
> [XXXXProcessor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] Found 
> checkpoint 1 from changelog XXXXProcessor-COMMAND_ID_STORE-changelog-2 for 
> store COMMAND_ID_STORE.NODE_3 2020-04-15 21:12:06.424 WARN 1 --- 
> [-StreamThread-1] c.g.f.c.s.validation.CommandIdValidator : Command duplicate 
> detected. Command id mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc
> {code}
> It seems that on NODE_3 there's a standby task 1_2 running on T-2, it 
> replicates a first valid command, thus creating a checkpoint file. Invalid 
> command causes an error on NODE_1, then NODE_3 T-2 takes over the task. It 
> finds the checkpoint file (which is fine), and starts to process the invalid 
> command. It crashes, same node T-1 takes over, finds the checkpoint file (!), 
> thinks state store is clean (apparently it's not as it contains state 
> modified by T-2) and finds a duplicated command id.
>  
> We use Java 11, kafka clients 4.1 and spring-kafka 2.4.5. We rolled back for 
> a moment to kafka clients 2.3.1 and the problem persists.
> *We performed more tests with configuration changes and after changing 
> `num.standby.replicas = 1` to `num.standby.replicas = 0` the problem 
> disappeared. It is also resolved when changing the store to 
> _inMemoryWindowStore._*
> In the SO question you can find the relevant java code. I don't have a sample 
> project to share at the moment which replicates the problem, but it is easily 
> repeatable in our project.
> Such behaviour can have serious implications on business logic, in our case 
> accidentally skipped messages without properly processing them.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to