[ 
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923322#comment-15923322
 ] 

Guozhang Wang commented on KAFKA-4890:
--------------------------------------

[~damianguy] I looked at the logs (actually only one of the log files 
{{firefly_2.log}} which I believe correspond to the first trace you posted 
above). And here is what I have found:

1. Before thread-1 hit the error, it did seem like having a long GC for while 
it is creating task 6 (used {{grep -i "threadid" logfile}}):

{code}
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - Seeking to 
beginning of partition fireflyProd-perGameScoreStore-changelog-6
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name topic.fireflyProd-userAwardStore-changelog.bytes-fetched
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name topic.fireflyProd-userAwardStore-changelog.records-fetched
2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Added sensor 
with name fireflyProd-perGameScoreStore-changelog-6.records-lag
2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.c.KafkaConsumer - 
Unsubscribed all topics or patterns and assigned partitions    // 11 seconds 
later
2017-03-12 20:40:22 [StreamThread-1] DEBUG o.a.k.c.m.Metrics - Removed sensor 
with name fireflyProd-perGameScoreStore-changelog-6.records-lag
{code}

And during this period of time thread-3 has deleted the state directory for 
task 0_6, as you observed:

{code}
2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6
{code}

With a deeper look, it actually shows that it is because thread-3 has once 
successfully grabbed the lock for 0_6 (during generation 97) before this task 
has been created on thread-1 (during generation 98) and within that 2 seconds, 
thread-3 grabs a bunch of locks and successfully deleted the state directories. 
At this time it likely did not have any data yet.

{code}
2017-03-12 20:37:04 [StreamThread-3] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 97
2017-03-12 20:37:04 [StreamThread-3] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-12] for group fireflyProd
2017-03-12 20:37:04 [StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 97
2017-03-12 20:37:04 [StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-14] for group fireflyProd
...
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_10 for task 0_10
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_10 for task 0_10
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_0 for task 0_0
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_0 for task 0_0
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_5 for task 0_5
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_4 for task 0_4
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_15 for task 0_15
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_8 for task 0_8
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_2 for task 0_2
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_6 for task 0_6
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_1 for task 0_1
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_7 for task 0_7
2017-03-12 20:38:07 [StreamThread-1] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_7 for task 0_7
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_1 for task 0_1
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_7 for task 0_7
2017-03-12 20:38:07 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_11 for task 0_11
2017-03-12 20:38:07 [StreamThread-2] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_13 for task 0_13
2017-03-12 20:38:08 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
Deleting obsolete state directory 0_13 for task 0_13
...
2017-03-12 20:39:17 [StreamThread-1] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 98
2017-03-12 20:39:17 [StreamThread-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-6, debugStatistics-14] for 
group fireflyProd
2017-03-12 20:39:17 [StreamThread-3] INFO  o.a.k.c.c.i.AbstractCoordinator - 
Successfully joined group fireflyProd with generation 98
2017-03-12 20:39:17 [StreamThread-3] INFO  o.a.k.c.c.i.ConsumerCoordinator - 
Setting newly assigned partitions [debugStatistics-3, debugStatistics-12] for 
group fireflyProd
...

{code}

So there are a couple of risks here:

1) In {{StateDirectory#unlock}}, the operation of releasing the lock and the 
closing of the channel is not atomic. So it is possible that thread-1 release 
the lock, thread-2 then grabs it, and then thread-1 close the channel causing 
thread-2's lock to be void. This is discussed in 
https://issues.apache.org/jira/browse/KAFKA-3812 already.
2) In {{StateDirectory#cleanRemovedTasks}} we delete the whole directory while 
holding the lock on an empty file within the directory; since the lock itself 
is advisory it does not prevent deleting the file while holding its file lock; 
but another concurrent thread trying to grab the lock could then re-create the 
file as a new one and hence grabbing the lock on a new file channel. Thus these 
two locks are not exclusive anymore.

I guess the reason it is hard to reproduce (if you are trying out on trunk) is 
that since the fix made in trunk, cleanup timeout only start ticking after the 
rebalance is done, hence these things happen much unlikely (note that thread-3 
immediately tries to delete other task's state directory 1 second after it has 
created its own tasks).

> State directory being deleted when another thread holds the lock
> ----------------------------------------------------------------
>
>                 Key: KAFKA-4890
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4890
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Damian Guy
>         Attachments: logs.tar.gz
>
>
> Looks like a state directory is being cleaned up when another thread already 
> has the lock:
> {code}
> 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager 
> - task [0_6] Registering state store perGameScoreStore to its state manager
> 2017-03-12 20:40:21 [StreamThread-3] INFO  o.a.k.s.p.i.StateDirectory - 
> Deleting obsolete state directory 0_6 for task 0_6
> 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - 
> User provided listener 
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group 
> fireflyProd failed on partition assignment
> org.apache.kafka.streams.errors.ProcessorStateException: Error while 
> executing put key 
> \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value 
> \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
>         at 
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
>         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
>         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
>         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.rocksdb.RocksDBException: `
>         at org.rocksdb.RocksDB.put(Native Method)
>         at org.rocksdb.RocksDB.put(RocksDB.java:488)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246)
>         ... 27 common frames omitted
> {code}
> Also 
> {code}
> 2017-03-12 20:46:58 [StreamThread-4] INFO  o.a.k.s.p.i.StateDirectory - 
> Deleting obsolete state directory 0_2 for task 0_2
> ...
> 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - 
> stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to 
> flush state store lifetimeScoreStore
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
>         at 
> org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error 
> while executing flush from store lifetimeScoreStore
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
>         at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
>         at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
>         ... 6 common frames omitted
> Caused by: org.rocksdb.RocksDBException: a
>         at org.rocksdb.RocksDB.flush(Native Method)
>         at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> {code}
> Operating System info
> Distributor ID: Debian
> Description:    Debian GNU/Linux 8.7 (jessie)
> Release:        8.7
> Codename:       jessie
> uname: 3.16.0-4-amd64
> FWIW - i don't see anything obvious and I can't reproduce it.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to