[
https://issues.apache.org/jira/browse/KAFKA-4890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932701#comment-15932701
]
Yunus Olgun commented on KAFKA-4890:
------------------------------------
Happy to help, but I wasn't so helpful this time, unfortunately.
Please disregard the unit test. Calling _get_ immediately after _submit_ turned
this supposedly multithreaded test into a sequential one. Couldn't reproduce
using proper Futures and simulated wait times. Also
OverlappingFileLockException should protect against multithreaded or single
threaded access from within same VM. My assumption was wrong.
In logs3.tar.gz file, timestamps are in seconds and logs are coming from
different threads. So, order of logs between different threads may not be
correct. I will try to reproduce the issue using milliseconds and AsyncAppender
in log configuration.
- In 0.10.2.0, this bug was a blocker for me to use multithreaded in streams
application. It happens frequently, rebalancing and state store initialization
takes too long.
- In 0.11.0.0, with default configuration I couldn't reproduce it. Even with
state.cleanup.delay.ms=100, it takes some time. Also rebalancing and state
store initalization is much faster now. It is not urgent for this version, imo.
> State directory being deleted when another thread holds the lock
> ----------------------------------------------------------------
>
> Key: KAFKA-4890
> URL: https://issues.apache.org/jira/browse/KAFKA-4890
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Damian Guy
> Attachments: logs2.tar.gz, logs3.tar.gz, logs.tar.gz
>
>
> Looks like a state directory is being cleaned up when another thread already
> has the lock:
> {code}
> 2017-03-12 20:39:17 [StreamThread-1] DEBUG o.a.k.s.p.i.ProcessorStateManager
> - task [0_6] Registering state store perGameScoreStore to its state manager
> 2017-03-12 20:40:21 [StreamThread-3] INFO o.a.k.s.p.i.StateDirectory -
> Deleting obsolete state directory 0_6 for task 0_6
> 2017-03-12 20:40:22 [StreamThread-1] ERROR o.a.k.c.c.i.ConsumerCoordinator -
> User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> fireflyProd failed on partition assignment
> org.apache.kafka.streams.errors.ProcessorStateException: Error while
> executing put key
> \x00\x00\x00\x00}\xA2\x9E\x9D\x05\xF6\x95\xAB\x01\x12dayOfGame and value
> \x00\x00\x00\x00z\x00\x00\x00\x00\x00\x80G@ from store perGameScoreStore
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:248)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:65)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:156)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:230)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:193)
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:152)
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.rocksdb.RocksDBException: `
> at org.rocksdb.RocksDB.put(Native Method)
> at org.rocksdb.RocksDB.put(RocksDB.java:488)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:246)
> ... 27 common frames omitted
> {code}
> Also
> {code}
> 2017-03-12 20:46:58 [StreamThread-4] INFO o.a.k.s.p.i.StateDirectory -
> Deleting obsolete state directory 0_2 for task 0_2
> ...
> 2017-03-12 20:47:02 [StreamThread-2] ERROR o.a.k.s.p.i.StreamThread -
> stream-thread [StreamThread-2] Failed to commit StandbyTask 0_2 state:
> org.apache.kafka.streams.errors.ProcessorStateException: task [0_2] Failed to
> flush state store lifetimeScoreStore
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:325)
> at
> org.apache.kafka.streams.processor.internals.StandbyTask.commit(StandbyTask.java:94)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitOne(StreamThread.java:777)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:767)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:739)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:661)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error
> while executing flush from store lifetimeScoreStore
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:346)
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.flush(RocksDBStore.java:337)
> at
> org.apache.kafka.streams.state.internals.WrappedStateStore$AbstractWrappedStateStore.flush(WrappedStateStore.java:80)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$6.run(MeteredKeyValueStore.java:92)
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:186)
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:112)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:323)
> ... 6 common frames omitted
> Caused by: org.rocksdb.RocksDBException: a
> at org.rocksdb.RocksDB.flush(Native Method)
> at org.rocksdb.RocksDB.flush(RocksDB.java:1642)
> {code}
> Operating System info
> Distributor ID: Debian
> Description: Debian GNU/Linux 8.7 (jessie)
> Release: 8.7
> Codename: jessie
> uname: 3.16.0-4-amd64
> FWIW - i don't see anything obvious and I can't reproduce it.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)