Hi,
Everything is understood and I will try out 0.10.2.0-rc0 shortly.

However one this is not clear:
Firstly i'd recommend you have different state directory configs for each
application instance.

Well I am running three separate instance of 4 threads each on three
different machines.
So each machine has its own physical structure though path to the states
dir is same for each, because that is the relative path where we have
mounted the data directory separately for each of these three machine.

So my streams state.dir setting is identical for all the instances, but
physically there are located at different locations.
So why do I need to have different config for each is not clear.

I will also test with CLEANUP_DELAY_MS_CONFIG to be 30 minutes.

Also one thing I wanted to know if I make partitions equal to total streams
threads which is 12, will that help in one thread always reading from a
single partition, and never a need to re-balance.

I however don't understand one thing that once a steady state has reached
and all threads have picked up their partitions then why there is ever a
need to do future re-balance, unless untill a thread dies.

Like this is not clear that how often is re-balance is triggered and is
there a way we can control it.

Thanks
Sachin



On Mon, Feb 6, 2017 at 10:10 PM, Damian Guy <damian....@gmail.com> wrote:

> Hi Sachin,
>
> Firstly i'd recommend you have different state directory configs for each
> application instance. I suspect you are potentially hitting an issue where
> the partition assignment has changed, the state directory locks get
> released, and i directory gets removed just before the lock is taken out by
> another thread or process. Though it is pretty hard to tell from the above
> logs. You might also want to set StreamsConfig.CLEANUP_DELAY_MS_CONFIG to
> a
> higher value than the default of 60 seconds. There is no reason why it
> couldn't be much higher, 30 minutes, maybe more, depends on how badly you
> need old task data cleaned up. Setting this value higher will reduce the
> likelihood of the exception occurring.
>
> 0.10.2 will be out shortly. It would be good if you can try that. You can
> find the current RC here: http://home.apache.org/~
> ewencp/kafka-0.10.2.0-rc0/
> You don't need to upgrade your kafka brokers, just the streams client.
>
> Thanks,
> Damian
>
> On Mon, 6 Feb 2017 at 10:53 Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi,
> > Yes on first we have three machines with same data directory setting.
> > So the state dir config is same in for each.
> >
> > If it helps this is the sequence of logs just before the thread shutting
> > down
> >
> > ....
> >
> > stream-thread [StreamThread-3] Committing all tasks because the commit
> > interval 30000ms has elapsed
> > stream-thread [StreamThread-3] Committing task 0_35
> > stream-thread [StreamThread-3] Committing task 0_38
> > stream-thread [StreamThread-3] Committing task 0_27
> > stream-thread [StreamThread-3] Committing task 0_12
> >
> > stream-thread [StreamThread-3] Committing all tasks because the commit
> > interval 30000ms has elapsed
> > stream-thread [StreamThread-3] Committing task 0_35
> > stream-thread [StreamThread-3] Committing task 0_38
> > stream-thread [StreamThread-3] Committing task 0_27
> > stream-thread [StreamThread-3] Committing task 0_12
> >
> > stream-thread [StreamThread-3] Committing all tasks because the commit
> > interval 30000ms has elapsed
> > stream-thread [StreamThread-3] Committing task 0_35
> > stream-thread [StreamThread-3] Committing task 0_38
> > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38 state:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_38]
> Failed
> > to flush state store key-table
> > Caused by: org.rocksdb.RocksDBException: IO error:
> >
> > /data/advice/kafka-streams/new-part-advice/0_38/key-
> table/key-table-201702052100/000009.sst:
> > No such file or directory
> > stream-thread [StreamThread-3] Shutting down
> > stream-thread [StreamThread-3] Committing consumer offsets of task 0_35
> > stream-thread [StreamThread-3] Committing consumer offsets of task 0_38
> > stream-thread [StreamThread-3] Committing consumer offsets of task 0_27
> > stream-thread [StreamThread-3] Committing consumer offsets of task 0_12
> > stream-thread [StreamThread-3] Closing a task 0_35
> > stream-thread [StreamThread-3] Closing a task 0_38
> > stream-thread [StreamThread-3] Closing a task 0_27
> > stream-thread [StreamThread-3] Closing a task 0_12
> > stream-thread [StreamThread-3] Flushing state stores of task 0_35
> > stream-thread [StreamThread-3] Flushing state stores of task 0_38
> > stream-thread [StreamThread-3] Failed while executing StreamTask 0_38
> duet
> > to flush state
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_38]
> Failed
> > to flush state store key-table
> > Caused by: org.rocksdb.RocksDBException: IO error:
> >
> > /data/advice/kafka-streams/new-part-advice/0_38/key-
> table/key-table-201702052100/000009.sst:
> > No such file or directory
> > stream-thread [StreamThread-3] Flushing state stores of task 0_27
> > stream-thread [StreamThread-3] Flushing state stores of task 0_12
> > stream-thread [StreamThread-3] Closing the state manager of task 0_35
> > stream-thread [StreamThread-3] Closing the state manager of task 0_38
> > stream-thread [StreamThread-3] Failed while executing StreamTask 0_38
> duet
> > to close state manager:
> > org.apache.kafka.streams.errors.ProcessorStateException: task [0_38]
> Failed
> > to close state store key-table
> > Caused by: org.rocksdb.RocksDBException: IO error:
> >
> > /data/advice/kafka-streams/new-part-advice/0_38/key-
> table/key-table-201702052100/000009.sst:
> > No such file or directory
> > stream-thread [StreamThread-3] Closing the state manager of task 0_27
> > stream-thread [StreamThread-3] Closing the state manager of task 0_12
> > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> > stream-thread [StreamThread-3] Removing all active tasks [[0_35, 0_38,
> > 0_27, 0_12]]
> > stream-thread [StreamThread-3] Removing all standby tasks [[]]
> > stream-thread [StreamThread-3] Stream thread shutdown complete
> >
> > It was working for iterations before that and then suddenly that dir/file
> > was gone and it could not commit/flush/close the state.
> >
> >
> > For second, what do you recommend. Is there something we can patch from
> > 10.2 into 10.1.
> > Certain commits or something.
> >
> > Or do we again need to upgrade the cluster or 10.2, or if we just upgrade
> > streams client to 10.2 will it work fine?
> > Since 10.2 is not released yet I suppose we would have build the snapshot
> > version.
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <damian....@gmail.com> wrote:
> >
> > > Hi Sachin,
> > >
> > > The first exception - Is each instance of your streams app on a single
> > > machine running with the same state directory config?
> > > The second exception - i believe is a bug in 0.10.1 that has been fixed
> > in
> > > 0.10.2. There has been a number of issues fixed in this area.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <sjmit...@gmail.com> wrote:
> > >
> > > > Hello All,
> > > > We recently upgraded to  kafka_2.10-0.10.1.1.
> > > >
> > > > We have a source topic with replication = 3 and partition = 40.
> > > > We have a streams application run with NUM_STREAM_THREADS_CONFIG = 4
> > and
> > > on
> > > > three machines. So 12 threads in total.
> > > >
> > > > What we do is start the same streams application one by one on three
> > > > machines.
> > > >
> > > > After some time what we noticed was that one of the machine streams
> > > > application just crashed. When we inspected the log here is what we
> > > found.
> > > > There were 2 sets of errors like:
> > > >
> > > > stream-thread [StreamThread-3] Failed to commit StreamTask 0_38
> state:
> > > > org.apache.kafka.streams.errors.ProcessorStateException: task [0_38]
> > > Failed
> > > > to flush state store key-table
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.flush(ProcessorStateManager.java:331)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.commit(StreamTask.java:267)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.commitOne(
> > > StreamThread.java:576)
> > > > [kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.commitAll(
> > > StreamThread.java:562)
> > > > [kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.maybeCommit(
> > > StreamThread.java:538)
> > > > [kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:456)
> > > > [kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:242)
> > > > [kafka-streams-0.10.1.1.jar:na]
> > > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> > > Error
> > > > while executing flush from store key-table-201702052100
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > flushInternal(RocksDBStore.java:375)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > flush(RocksDBStore.java:366)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.flush(
> > > RocksDBWindowStore.java:256)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(
> > > MeteredWindowStore.java:116)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.CachingWindowStore.flush(
> > > CachingWindowStore.java:119)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorStateManager.flush(ProcessorStateManager.java:329)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     ... 6 common frames omitted
> > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > >
> > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > 201702052100/000009.sst:
> > > > No such file or directory
> > > >     at org.rocksdb.RocksDB.flush(Native Method)
> > > ~[rocksdbjni-4.9.0.jar:na]
> > > >     at org.rocksdb.RocksDB.flush(RocksDB.java:1360)
> > > > ~[rocksdbjni-4.9.0.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > flushInternal(RocksDBStore.java:373)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     ... 11 common frames omitted
> > > >
> > > > When we queried the directory
> > > > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052100/
> we
> > > > could not find the directory.
> > > > So looks like this directory was earlier deleted by some task and now
> > > some
> > > > other task is trying to flush it too.
> > > > What could be possible the reason for the same?
> > > >
> > > > Another set of error we see is this:
> > > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > > [StreamThread-1] Failed to rebalance
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:410)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.run(StreamThread.java:242)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > Caused by: org.apache.kafka.streams.errors.ProcessorStateException:
> > > Error
> > > > opening store key-table-201702052000 at location
> > > > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052000
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > openDB(RocksDBStore.java:190)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > openDB(RocksDBStore.java:159)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$Segment.
> > > openDB(RocksDBWindowStore.java:72)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > getOrCreateSegment(RocksDBWindowStore.java:388)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > >
> > org.apache.kafka.streams.state.internals.RocksDBWindowStore.putInternal(
> > > RocksDBWindowStore.java:319)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.
> RocksDBWindowStore.access$000(
> > > RocksDBWindowStore.java:51)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.
> RocksDBWindowStore$1.restore(
> > > RocksDBWindowStore.java:206)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > restoreActiveState(ProcessorStateManager.java:235)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.ProcessorStateManager.
> > > register(ProcessorStateManager.java:198)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > ProcessorContextImpl.register(ProcessorContextImpl.java:123)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(
> > > RocksDBWindowStore.java:200)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.MeteredWindowStore.init(
> > > MeteredWindowStore.java:66)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.CachingWindowStore.init(
> > > CachingWindowStore.java:64)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.AbstractTask.
> > > initializeStateStores(AbstractTask.java:81)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamTask.<init>(StreamTask.java:119)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.
> StreamThread.access$100(
> > > StreamThread.java:69)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread$1.
> > > onPartitionsAssigned(StreamThread.java:124)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.
> > > onJoinComplete(ConsumerCoordinator.java:228)
> > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:259)
> > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > pollOnce(KafkaConsumer.java:1013)
> > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:979)
> > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:407)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     ... 1 common frames omitted
> > > > Caused by: org.rocksdb.RocksDBException: IO error: lock
> > > >
> > /data/kafka-streams/new-part/0_38/key-table/key-table-201702052000/LOCK:
> > > No
> > > > locks available
> > > >     at org.rocksdb.RocksDB.open(Native Method)
> > > ~[rocksdbjni-4.9.0.jar:na]
> > > >     at org.rocksdb.RocksDB.open(RocksDB.java:184)
> > > > ~[rocksdbjni-4.9.0.jar:na]
> > > >     at
> > > >
> > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > openDB(RocksDBStore.java:183)
> > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > >     ... 26 common frames omitted
> > > >
> > > > And then the whole application shuts down. We have not understood
> this
> > > part
> > > > of the error and if this second error is somehow related to first
> > error.
> > > >
> > > > Please let us know what could be the cause of these errors or if they
> > > have
> > > > been fixed and if there is some way to fix this in current release.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > >
> >
>

Reply via email to