[ https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15923361#comment-15923361 ]
Guozhang Wang commented on KAFKA-4890: -------------------------------------- I have uploaded a small PR to improve the log4j for debugging, since I compared 0.10.2 and trunk but did not observe any differences in logic that could cause it in 0.10.2 except the cleanup delay change you made. So if you happen to have the code to reproduce could you try that with the PR and a small cleanup delay value? Or you can share the code so I can try reproducing on my end. > State directory being deleted when another thread holds the lock > ---------------------------------------------------------------- > > Key: KAFKA-4890 > URL: https://issues.apache.org/jira/browse/KAFKA-4890 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Damian Guy > Attachments: logs.tar.gz > > > Looks like a state directory is being cleaned up when another thread already > has the lock: > {code} > 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager > - task [0_6] Registering state store perGameScoreStore to its state manager > 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_6 for task 0_6 > 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator - > User provided listener > org.apache.kafka.streams.processor.internals.StreamThread$1 for group > fireflyProd failed on partition assignment > org.apache.kafka.streams.errors.ProcessorStateException: Error while > executing put key > \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value > \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248) > at > org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65) > at > org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193) > at > org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62) > at > org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.rocksdb.RocksDBException: ` > at org.rocksdb.RocksDB.put(Native Method) > at org.rocksdb.RocksDB.put(RocksDB.java:488) > at > org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246) > ... 27 common frames omitted > {code} > Also > {code} > 2017-03-12 20:46:58 [StreamThread-4] INFO o.a.k.s.p.i.StateDirectory - > Deleting obsolete state directory 0_2 for task 0_2 > ... > 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread - > stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state: > org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to > flush state store lifetimeScoreStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325) > at > org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777) > at > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368) > Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error > while executing flush from store lifetimeScoreStore > at > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346) > at > org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337) > at > org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186) > at > org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323) > ... 6 common frames omitted > Caused by: org.rocksdb.RocksDBException: a > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1642) > {code} > Operating System info > Distributor ID: Debian > Description: Debian GNU/Linux 8.7 (jessie) > Release: 8.7 > Codename: jessie > uname: 3.16.0-4-amd64 > FWIW - i don't see anything obvious and I can't reproduce it. -- This message was sent by Atlassian JIRA (v6.3.15#6346)