Hi,
Yes on first we have three machines with same data directory setting.
So the state dir config is same in for each.

If it helps this is the sequence of logs just before the thread shutting
down

....

stream-thread [StreamThread-3] Committing all tasks because the commit
interval 30000ms has elapsed
stream-thread [StreamThread-3] Committing task 0_35
stream-thread [StreamThread-3] Committing task 0_38
stream-thread [StreamThread-3] Committing task 0_27
stream-thread [StreamThread-3] Committing task 0_12

stream-thread [StreamThread-3] Committing all tasks because the commit
interval 30000ms has elapsed
stream-thread [StreamThread-3] Committing task 0_35
stream-thread [StreamThread-3] Committing task 0_38
stream-thread [StreamThread-3] Committing task 0_27
stream-thread [StreamThread-3] Committing task 0_12

stream-thread [StreamThread-3] Committing all tasks because the commit
interval 30000ms has elapsed
stream-thread [StreamThread-3] Committing task 0_35
stream-thread [StreamThread-3] Committing task 0_38
stream-thread [StreamThread-3] Failed to commit StreamTask 0_38 state:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_38] Failed
to flush state store key-table
Caused by: org.rocksdb.RocksDBException: IO error:
/data/advice/kafka-streams/new-part-advice/0_38/key-table/key-table-201702052100/000009.sst:
No such file or directory
stream-thread [StreamThread-3] Shutting down
stream-thread [StreamThread-3] Committing consumer offsets of task 0_35
stream-thread [StreamThread-3] Committing consumer offsets of task 0_38
stream-thread [StreamThread-3] Committing consumer offsets of task 0_27
stream-thread [StreamThread-3] Committing consumer offsets of task 0_12
stream-thread [StreamThread-3] Closing a task 0_35
stream-thread [StreamThread-3] Closing a task 0_38
stream-thread [StreamThread-3] Closing a task 0_27
stream-thread [StreamThread-3] Closing a task 0_12
stream-thread [StreamThread-3] Flushing state stores of task 0_35
stream-thread [StreamThread-3] Flushing state stores of task 0_38
stream-thread [StreamThread-3] Failed while executing StreamTask 0_38 duet
to flush state
org.apache.kafka.streams.errors.ProcessorStateException: task [0_38] Failed
to flush state store key-table
Caused by: org.rocksdb.RocksDBException: IO error:
/data/advice/kafka-streams/new-part-advice/0_38/key-table/key-table-201702052100/000009.sst:
No such file or directory
stream-thread [StreamThread-3] Flushing state stores of task 0_27
stream-thread [StreamThread-3] Flushing state stores of task 0_12
stream-thread [StreamThread-3] Closing the state manager of task 0_35
stream-thread [StreamThread-3] Closing the state manager of task 0_38
stream-thread [StreamThread-3] Failed while executing StreamTask 0_38 duet
to close state manager:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_38] Failed
to close state store key-table
Caused by: org.rocksdb.RocksDBException: IO error:
/data/advice/kafka-streams/new-part-advice/0_38/key-table/key-table-201702052100/000009.sst:
No such file or directory
stream-thread [StreamThread-3] Closing the state manager of task 0_27
stream-thread [StreamThread-3] Closing the state manager of task 0_12
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
stream-thread [StreamThread-3] Removing all active tasks [[0_35, 0_38,
0_27, 0_12]]
stream-thread [StreamThread-3] Removing all standby tasks [[]]
stream-thread [StreamThread-3] Stream thread shutdown complete

It was working for iterations before that and then suddenly that dir/file
was gone and it could not commit/flush/close the state.


For second, what do you recommend. Is there something we can patch from
10.2 into 10.1.
Certain commits or something.

Or do we again need to upgrade the cluster or 10.2, or if we just upgrade
streams client to 10.2 will it work fine?
Since 10.2 is not released yet I suppose we would have build the snapshot
version.

Thanks
Sachin


On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi Sachin,
>
> The first exception - Is each instance of your streams app on a single
> machine running with the same state directory config?
> The second exception - i believe is a bug in 0.10.1 that has been fixed in
> 0.10.2. There has been a number of issues fixed in this area.
>
> Thanks,
> Damian
>
> On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hello All,
> > We recently upgraded to  kafka_2.10-0.10.1.1.
> >
> > We have a source topic with replication = 3 and partition = 40.
> > We have a streams application run with NUM_STREAM_THREADS_CONFIG = 4 and
> on
> > three machines. So 12 threads in total.
> >
> > What we do is start the same streams application one by one on three
> > machines.
> >
> > After some time what we noticed was that one of the machine streams
> > application just crashed. When we inspected the log here is what we
> found.
> > There were 2 sets of errors like:
> >
> > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38 state:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_38]
> Failed
> > to flush state store key-table
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:331)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.commit(StreamTask.java:267)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> StreamThread.java:576)
> > [kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> StreamThread.java:562)
> > [kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(
> StreamThread.java:538)
> > [kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:456)
> > [kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> > [kafka-streams-0.10.1.1.jar:na]
> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> Error
> > while executing flush from store key-table-201702052100
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:375)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flush(RocksDBStore.java:366)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(
> RocksDBWindowStore.java:256)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(
> MeteredWindowStore.java:116)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(
> CachingWindowStore.java:119)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorStateManager.flush(ProcessorStateManager.java:329)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     ... 6 common frames omitted
> > Caused by: org.rocksdb.RocksDBException: IO error:
> >
> > /data/kafka-streams/new-part/0_38/key-table/key-table-
> 201702052100/000009.sst:
> > No such file or directory
> >     at org.rocksdb.RocksDB.flush(Native Method)
> ~[rocksdbjni-4.9.0.jar:na]
> >     at org.rocksdb.RocksDB.flush(RocksDB.java:1360)
> > ~[rocksdbjni-4.9.0.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> flushInternal(RocksDBStore.java:373)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     ... 11 common frames omitted
> >
> > When we queried the directory
> > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052100/ we
> > could not find the directory.
> > So looks like this directory was earlier deleted by some task and now
> some
> > other task is trying to flush it too.
> > What could be possible the reason for the same?
> >
> > Another set of error we see is this:
> > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [StreamThread-1] Failed to rebalance
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:410)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.run(StreamThread.java:242)
> > ~[kafka-streams-0.10.1.1.jar:na]
> > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> Error
> > opening store key-table-201702052000 at location
> > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052000
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> openDB(RocksDBStore.java:190)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> openDB(RocksDBStore.java:159)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.
> openDB(RocksDBWindowStore.java:72)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> getOrCreateSegment(RocksDBWindowStore.java:388)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(
> RocksDBWindowStore.java:319)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.access$000(
> RocksDBWindowStore.java:51)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore$1.restore(
> RocksDBWindowStore.java:206)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> restoreActiveState(ProcessorStateManager.java:235)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> register(ProcessorStateManager.java:198)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> RocksDBWindowStore.java:200)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> MeteredWindowStore.java:66)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> CachingWindowStore.java:64)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.AbstractTask.
> initializeStateStores(AbstractTask.java:81)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamTask.<init>(StreamTask.java:119)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.createStreamTask(StreamThread.java:633)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.
> StreamThread.addStreamTasks(StreamThread.java:660)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.access$100(
> StreamThread.java:69)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread$1.
> onPartitionsAssigned(StreamThread.java:124)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> onJoinComplete(ConsumerCoordinator.java:228)
> > ~[kafka-clients-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> joinGroupIfNeeded(AbstractCoordinator.java:313)
> > ~[kafka-clients-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> ensureActiveGroup(AbstractCoordinator.java:277)
> > ~[kafka-clients-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> ConsumerCoordinator.java:259)
> > ~[kafka-clients-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:1013)
> > ~[kafka-clients-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:979)
> > ~[kafka-clients-0.10.1.1.jar:na]
> >     at
> >
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> StreamThread.java:407)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     ... 1 common frames omitted
> > Caused by: org.rocksdb.RocksDBException: IO error: lock
> > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052000/LOCK:
> No
> > locks available
> >     at org.rocksdb.RocksDB.open(Native Method)
> ~[rocksdbjni-4.9.0.jar:na]
> >     at org.rocksdb.RocksDB.open(RocksDB.java:184)
> > ~[rocksdbjni-4.9.0.jar:na]
> >     at
> >
> > org.apache.kafka.streams.state.internals.RocksDBStore.
> openDB(RocksDBStore.java:183)
> > ~[kafka-streams-0.10.1.1.jar:na]
> >     ... 26 common frames omitted
> >
> > And then the whole application shuts down. We have not understood this
> part
> > of the error and if this second error is somehow related to first error.
> >
> > Please let us know what could be the cause of these errors or if they
> have
> > been fixed and if there is some way to fix this in current release.
> >
> > Thanks
> > Sachin
> >
>

Reply via email to