First about the metrics attributes, now I remembered there is indeed a
change as in

https://cwiki.apache.org/confluence/display/KAFKA/KIP-105%3A+Addition+of+Recording+Level+for+Sensors

We have added a hierarchy to the sensors, and currently there are only two
levels: INFO and DEBUG. Along with it, in Streams the only the thread-level
sensors are set as INFO level metrics, task-level / processor-node-level /
store-level metrics are all set to DEBUG.

You can enable DEBUG level sensor collecting by setting this in the config:

myConfigs.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
Sensor.RecordingLevel.DEBUG.toString())

---------------------------------

Next, about the session timeout / max poll timeout, the former is usually
determined by your tolerance of soft failures or expected maintenance like
long GC / rolling bounce for upgrades / etc that you do not want to trigger
a rebalance (e.g. if you are expecting to do a rolling bounce and you think
each instance should be able to be shutdown and rebooted in 3 min, and in
order to not reassign partitions during this period you need to set the
session timeout to be larger than 3 min), and the latter is usually
determined by the maximum time you can take to process a single record
(consider including all the IO operations), and is usually higher than the
former config, since, generally speaking, if you are waiting for an IO
operation it is equal to a long GC.

For your case though, it seems the CommitFailedException is thrown because
the process slows down due to some memory pressure, in which case setting
configs to large enough may not help since if you do have a memory leak
(and since you mentioned this is not the case with the other deployment
setting I'm not totally sure if there is a memory leak), then eventually
the process will be either super slow or hangs and the exception will be
thrown anyways. So I think the first target is to investigate why there is
ever increasing memory pressure and decreased CPU utilization under your
deployment setting #2 (assuming it was not the case for setting #1).


Guozhang


On Fri, Feb 17, 2017 at 8:47 PM, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> Regrading
> 0. "frequent rebalance and dying of threads": did you see any warn / error
> log entries or exceptions when threads die or rebalance is triggered?
>
> The exception we get is CommitFailedException and then on partition revoke
> also it throws CommitFailedException and stream thread is killed.
> In other mailing thread I have pointed that since this exception is handled
> by consumer coordinator, you should not kill the stream thread. If we can
> let stream thread continue in runLoop in this case we would always have
> right number of threads and the problem be mitigated somewhat. Because
> killing of thread again triggers a re-balance due to that thread which had
> just got new partitions assigned.
>
> Also note that we start three applications one by one, We let one instance
> get all the partitions and then start second then again wait till
> re-balance is done and then start the third thread. Then we check and see
> all threads have one partition assigned and application is now in steady
> state. It runs like this for a while and then we see slowly memory
> utilization increasing and also the cpu waiting time. The process slows
> down and it starts triggering re-balance due to CommitFailedException and
> slowing all threads either die or the whole consumer cluster goes into
> perpetual re-balance state. Then we have no option but to stop all and
> start again.
>
> I understand your suggestion and even we know in threaded multi machine
> mode, we need to set higher values for session timeout and max poll
> interval.
> For session timeout we have set it to be 100 sec. default is 10 sec.
> We have kept max poll same as default in both cases ie 5 minutes.
>
> So how do we identify what would be a good value to set for these 2
> timeouts. What do you suggest we should set these values to?
>
> Next
> And about attribute values becoming 0 in 0.10.2.0: that is wired,
> I am using jmx term and let me post the output of
> stream-rocksdb-window-metrics in both the cases
>
> 10.1.1
> info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1-StreamThread-1
> I get 23 attributes and when I try say
> $>get -s info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1-StreamThread-1
> stream-table-put-avg-latency-ms
> #mbean =
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1-StreamThread-1:
> 1414732.5555555555
> $>get -s info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1-StreamThread-1
> stream-table-fetch-avg-latency-ms
> #mbean =
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1-StreamThread-1:
> 158805.375
>
> All is fine here.
>
> Now on 10.2.0 (note streams app is identical as running on 10.1.1, with
> just app name and streams dir changed)
> info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1
>
> I get 35 attributes and when I try say
> $>get -s info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1
> stream-table-put-latency-avg
> #mbean =
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1:
> 0.0
> $>get -s info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1
> stream-table-fetch-latency-avg
> #mbean =
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1:
> 0.0
> $>get -s info -b
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1
> stream-table-get-latency-avg
> #mbean =
> kafka.streams:type=stream-rocksdb-window-metrics,client-
> id=test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1:
> 0.0
>
>
> So you can see there is clearly some issue with metric collection for
> rocksdb and since we suspect multiple instance of rocksdb is the issue for
> multi threaded mode we need these values to understand whats going on.
>
> Also whats with name like
> test-new-1.2-f510e082-d56c-4b2e-84b3-e996a9810831-StreamThread-1
> (f510e082-d56c-4b2e-84b3-e996a9810831)
> 1.1 had much simpler name like test-new-1-StreamThread-1 which we can
> easily add in our crontab for monitoring.
>
> Also please suggest how else we can monitor rocksdb.
>
> Thanks
> Sachin
>
>
>
> On Sat, Feb 18, 2017 at 3:36 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Sachin,
> >
> > 0. "frequent rebalance and dying of threads": did you see any warn /
> error
> > log entries or exceptions when threads die or rebalance is triggered?
> >
> > My guess is that you are likely hitting this issue:
> > https://issues.apache.o
> > rg/jira/browse/KAFKA-3775
> >
> > The scenario is that, when you are starting multiple instance threads at
> > roughly the same time (in your case 3 * 4 = 12), the first rebalance may
> > only have one of them participated (this is due to a timing difference on
> > the coordinator side), and this thread will get all the tasks / state
> > stores and hence will take very long time to complete the rebalancing to
> > restore all these state stores. And if you are unlucky, in the worst case
> > you will see N consecutive rebalances where each rebalance see 1, 2, 3,
> ...
> > N consumer members joining the group. And the cost of each rebalance is
> > actually different: assuming you have M partitions and a total of M
> > threads, and the worst case of M rebalances happens, the cost would be:
> >
> > M (1st rebalance, here a single thread needs to bootstrap all M state
> > stores) + M / 2 (2nd rebalance with two instances, each get M/2 stat
> > stores) + ... + 1
> >
> > And what makes it worse is that, while the previous rebalance is taking
> too
> > long, the coordinator may treat these members as dead since they are busy
> > "bootstrapping" and did not heartbeat to it yet, so kicking the instance
> > out of the group and trigger another rebalance, which from your point of
> > view it seems "endless rebalances".
> >
> > To validate, you can consider setting session.timeout.ms and
> > max.poll.interval.ms to a very large number, and see if after several
> > rebalances (12 in worst case for your scenario), if it does stable-down.
> >
> >
> > We have a WIP improvement for this issue (https://github.com/apache/kaf
> > ka/pull/2560#pullrequestreview-22406233), which unfortunately will not
> be
> > included in the upcoming 0.10.2 release.
> >
> >
> > 1/2.
> >
> > You can find the metrics names in kafka/docs, which will be pushed to the
> > web docs in the ops page (http://kafka.apache.org/0102/
> > documentation.html#monitoring) for metrics here once 0.10.2 is out:
> >
> > https://github.com/apache/kafka/blob/trunk/docs/ops.html#L1319
> >
> > And about attribute values becoming 0 in 0.10.2.0: that is wired, as we
> > did
> > not change the attribute names or bean names at all from 0.10.1 to
> 0.10.2,
> > only adding a few more on different granularities. So if the metrics
> worked
> > in 0.10.1, they should still work in 0.10.2. Maybe just grep all metrics
> > like and see what are those?
> >
> >
> >
> > Guozhang
> >
> >
> > On Fri, Feb 17, 2017 at 6:36 AM, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >
> > > Hi,
> > > I am now starting with rocksdb monitoring and comparing 2 identical
> > streams
> > > with 2 identical input topics.
> > >
> > > Case1
> > > Single partition topic with single thread streams apps running 0.10.1.1
> > > This case is pretty stable with hardly any cpu wait time noticed.
> > >
> > > Case 2
> > > 12 partition topic with 3 instances running on separate machines using
> > > 0.10.2.0.rc0 and each app having 4 threads so ideal state is 1
> partition
> > 1
> > > thread.
> > > This case is very unstable with frequent re-balance and dying of
> threads.
> > > CPU wait time is very high and many times grater than 50%
> > >
> > > kafka Cluster is same and of version 0.10.1.1
> > >
> > > I had few questions
> > > 1. What attributes of rocksdb-window metrices should I usually monitor
> > and
> > > what would be their ideal values.
> > >
> > >
> > > 2. Also I tried accessing some attributes like put, fetch, get and
> flush
> > in
> > > both 0.10.1.1 instance and 0.10.2.0
> > > In 0.10.1.1 I got some values which I understand are values in nano
> > > seconds.
> > > In 0.10.2.0 value of all these attributes I queried were 0.
> > >
> > > What would this mean?
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > > On Fri, Feb 10, 2017 at 12:33 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > >
> > > > Sachin,
> > > >
> > > > Thanks for sharing your observations, that are very helpful.
> > > >
> > > > Regards to monitoring, there are indeed metrics inside the Streams
> > > library
> > > > to meter state store operations; for rocksdb it records the average
> > > latency
> > > > and callrate for put / get / delete / all / range and flush /
> restore.
> > > You
> > > > can find the corresponding metric names in the monitoring section on
> > web
> > > > docs once 0.10.2 is out:
> > > >
> > > > https://kafka.apache.org/documentation/#monitoring
> > > >
> > > > Or you can just grep all metric names and see which ones are of
> > interests
> > > > to you.
> > > >
> > > >
> > > > Regards to the observations, I'm particularly interested in the fact
> > that
> > > > "memory utilization keeps increasing". But since rocksdb uses both
> > > on-heap
> > > > and off heap memory it is hard to tell if there is really a memory
> > leak.
> > > > Could you share your usage of the state stores, like are they used
> for
> > > > aggregations / joins / else, and did you override any of the rocksdb
> > > config
> > > > settings?
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Feb 9, 2017 at 8:58 PM, Sachin Mittal <sjmit...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi,
> > > > > We recently upgraded to 0.10.2.0-rc0, the rocksdb issue seems to be
> > > > > resolved however we are just not able to get the streams going
> under
> > > our
> > > > > current scenario.
> > > > > The issue seems to be still related to rocksdb.
> > > > > Let me explain the situation:
> > > > >
> > > > > We have 40 partitions and 12 threads distributes across three
> > machines
> > > > > (each having 4).
> > > > > Roughly 15 partition gets assigned to each machine. Thus we have 15
> > > state
> > > > > stores on each machine.
> > > > >
> > > > > What we see is that streams frequently gets rebalanced due to
> > > > > CommitFailedException and as you have said this should not happen
> > once
> > > > > application reaches steady state.
> > > > > We are running an instance on 4 core linux box (virtual machine).
> > What
> > > we
> > > > > observe is that there is lot of waiting time for each core
> > consistently
> > > > > greater than 50%. What we suspect is that application is spending
> lot
> > > of
> > > > > time on disk I/O this CPU keeps waiting. Also we observe that rate
> of
> > > > > consumption from source topic falls over time. Also overtime we see
> > > that
> > > > > CPU utilization falls and memory utilization increases.
> > > > >
> > > > > Then we we did was used 12 partition for same topic and 12 threads,
> > so
> > > > each
> > > > > thread processes from single partition and 4 state stores get
> created
> > > per
> > > > > machine. (note streams application code is identical to above
> > > > > configuration).
> > > > > Here what we have observed so far is a stable configuration. After
> a
> > > > stead
> > > > > state no rebalance has same thread continues processing from same
> > > > > partition.
> > > > > Also we see that per core wait time fairly stable around 10%.
> > > > >
> > > > > Finally we have another configuration where a same single thread
> > > streams
> > > > > application reading from identical single partition topic. Here
> also
> > > > > application never fails. This is most consistent configuration
> > running
> > > > from
> > > > > months now. The per core wait time is usually 0 - 3%.
> > > > >
> > > > > So over these experiments we feel that it is rocksdb that may be
> > > causing
> > > > > the state of streams application deteriorate over time. Is there
> any
> > > > rocks
> > > > > db metrics/logs we collect that we can watch and infer anything.
> > > > > I doubt there is any memory leak (as some have reported) because
> > single
> > > > > thread application seems to be running forever.
> > > > > However is there any tuning that anyone can recommend which will
> make
> > > > > rocksdb more optimized.
> > > > >
> > > > > Has anyone else facing similar issues when there are many rocksdb
> > local
> > > > > state store on a machine then application is not performing as well
> > as
> > > > > single state store.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Feb 7, 2017 at 3:21 PM, Damian Guy <damian....@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi Sachin,
> > > > > >
> > > > > > Sorry i misunderstood what you had said. You are running 3
> > instances,
> > > > one
> > > > > > per machine? I thought you said you were running 3 instances on
> > each
> > > > > > machine.
> > > > > >
> > > > > > Regarding partitions: you are better off having more partitions
> as
> > > this
> > > > > > effects the maximum degree of parallelism you can achieve in the
> > app.
> > > > If
> > > > > > you only have 12 partitions then you can only have at most 12
> > threads
> > > > in
> > > > > > total. If you have 40 partitions then you can have up to 40
> threads
> > > and
> > > > > so
> > > > > > on. It won't help with rebalancing anyway.
> > > > > >
> > > > > > You are correct, once all the instances are up and running then
> > > > > rebalances
> > > > > > generally shouldn't happen unless there is a failure of some
> kind.
> > To
> > > > > > maintain membership of the consumer group the consumer used by
> > > streams
> > > > > > needs to poll at least every max.poll.interval.ms - the default
> > for
> > > > this
> > > > > > is
> > > > > > 5 minutes. If for any reason, the processing of records returned
> > > from a
> > > > > > call to poll takes longer than  max.poll.interval.ms then the
> > > consumer
> > > > > > will
> > > > > > be kicked out of the group and a rebalance will happen. If you
> are
> > > > seeing
> > > > > > rebalances occurring after everything has been up and running and
> > > > > reached a
> > > > > > steady state, you might want to increase the value of
> > > > > max.poll.interval.ms.
> > > > > > You
> > > > > > can also adjust  max.poll.records to decrease the maximum number
> of
> > > > > records
> > > > > > retrieved by a single call to poll - this will help to reduce the
> > > > > > processing time.
> > > > > >
> > > > > > Thanks,
> > > > > > Damian
> > > > > >
> > > > > > On Tue, 7 Feb 2017 at 07:24 Sachin Mittal <sjmit...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > Everything is understood and I will try out 0.10.2.0-rc0
> shortly.
> > > > > > >
> > > > > > > However one this is not clear:
> > > > > > > Firstly i'd recommend you have different state directory
> configs
> > > for
> > > > > each
> > > > > > > application instance.
> > > > > > >
> > > > > > > Well I am running three separate instance of 4 threads each on
> > > three
> > > > > > > different machines.
> > > > > > > So each machine has its own physical structure though path to
> the
> > > > > states
> > > > > > > dir is same for each, because that is the relative path where
> we
> > > have
> > > > > > > mounted the data directory separately for each of these three
> > > > machine.
> > > > > > >
> > > > > > > So my streams state.dir setting is identical for all the
> > instances,
> > > > but
> > > > > > > physically there are located at different locations.
> > > > > > > So why do I need to have different config for each is not
> clear.
> > > > > > >
> > > > > > > I will also test with CLEANUP_DELAY_MS_CONFIG to be 30 minutes.
> > > > > > >
> > > > > > > Also one thing I wanted to know if I make partitions equal to
> > total
> > > > > > streams
> > > > > > > threads which is 12, will that help in one thread always
> reading
> > > > from a
> > > > > > > single partition, and never a need to re-balance.
> > > > > > >
> > > > > > > I however don't understand one thing that once a steady state
> has
> > > > > reached
> > > > > > > and all threads have picked up their partitions then why there
> is
> > > > ever
> > > > > a
> > > > > > > need to do future re-balance, unless untill a thread dies.
> > > > > > >
> > > > > > > Like this is not clear that how often is re-balance is
> triggered
> > > and
> > > > is
> > > > > > > there a way we can control it.
> > > > > > >
> > > > > > > Thanks
> > > > > > > Sachin
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 6, 2017 at 10:10 PM, Damian Guy <
> > damian....@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Sachin,
> > > > > > > >
> > > > > > > > Firstly i'd recommend you have different state directory
> > configs
> > > > for
> > > > > > each
> > > > > > > > application instance. I suspect you are potentially hitting
> an
> > > > issue
> > > > > > > where
> > > > > > > > the partition assignment has changed, the state directory
> locks
> > > get
> > > > > > > > released, and i directory gets removed just before the lock
> is
> > > > taken
> > > > > > out
> > > > > > > by
> > > > > > > > another thread or process. Though it is pretty hard to tell
> > from
> > > > the
> > > > > > > above
> > > > > > > > logs. You might also want to set StreamsConfig.CLEANUP_DELAY_
> > > > > MS_CONFIG
> > > > > > to
> > > > > > > > a
> > > > > > > > higher value than the default of 60 seconds. There is no
> reason
> > > why
> > > > > it
> > > > > > > > couldn't be much higher, 30 minutes, maybe more, depends on
> how
> > > > badly
> > > > > > you
> > > > > > > > need old task data cleaned up. Setting this value higher will
> > > > reduce
> > > > > > the
> > > > > > > > likelihood of the exception occurring.
> > > > > > > >
> > > > > > > > 0.10.2 will be out shortly. It would be good if you can try
> > that.
> > > > You
> > > > > > can
> > > > > > > > find the current RC here: http://home.apache.org/~
> > > > > > > > ewencp/kafka-0.10.2.0-rc0/
> > > > > > > > You don't need to upgrade your kafka brokers, just the
> streams
> > > > > client.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Damian
> > > > > > > >
> > > > > > > > On Mon, 6 Feb 2017 at 10:53 Sachin Mittal <
> sjmit...@gmail.com>
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > > Yes on first we have three machines with same data
> directory
> > > > > setting.
> > > > > > > > > So the state dir config is same in for each.
> > > > > > > > >
> > > > > > > > > If it helps this is the sequence of logs just before the
> > thread
> > > > > > > shutting
> > > > > > > > > down
> > > > > > > > >
> > > > > > > > > ....
> > > > > > > > >
> > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because
> > the
> > > > > > commit
> > > > > > > > > interval 30000ms has elapsed
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_27
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_12
> > > > > > > > >
> > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because
> > the
> > > > > > commit
> > > > > > > > > interval 30000ms has elapsed
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_27
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_12
> > > > > > > > >
> > > > > > > > > stream-thread [StreamThread-3] Committing all tasks because
> > the
> > > > > > commit
> > > > > > > > > interval 30000ms has elapsed
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_35
> > > > > > > > > stream-thread [StreamThread-3] Committing task 0_38
> > > > > > > > > stream-thread [StreamThread-3] Failed to commit StreamTask
> > 0_38
> > > > > > state:
> > > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> > task
> > > > > [0_38]
> > > > > > > > Failed
> > > > > > > > > to flush state store key-table
> > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > > > >
> > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > > > > table/key-table-201702052100/000009.sst:
> > > > > > > > > No such file or directory
> > > > > > > > > stream-thread [StreamThread-3] Shutting down
> > > > > > > > > stream-thread [StreamThread-3] Committing consumer offsets
> of
> > > > task
> > > > > > 0_35
> > > > > > > > > stream-thread [StreamThread-3] Committing consumer offsets
> of
> > > > task
> > > > > > 0_38
> > > > > > > > > stream-thread [StreamThread-3] Committing consumer offsets
> of
> > > > task
> > > > > > 0_27
> > > > > > > > > stream-thread [StreamThread-3] Committing consumer offsets
> of
> > > > task
> > > > > > 0_12
> > > > > > > > > stream-thread [StreamThread-3] Closing a task 0_35
> > > > > > > > > stream-thread [StreamThread-3] Closing a task 0_38
> > > > > > > > > stream-thread [StreamThread-3] Closing a task 0_27
> > > > > > > > > stream-thread [StreamThread-3] Closing a task 0_12
> > > > > > > > > stream-thread [StreamThread-3] Flushing state stores of
> task
> > > 0_35
> > > > > > > > > stream-thread [StreamThread-3] Flushing state stores of
> task
> > > 0_38
> > > > > > > > > stream-thread [StreamThread-3] Failed while executing
> > > StreamTask
> > > > > 0_38
> > > > > > > > duet
> > > > > > > > > to flush state
> > > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> > task
> > > > > [0_38]
> > > > > > > > Failed
> > > > > > > > > to flush state store key-table
> > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > > > >
> > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > > > > table/key-table-201702052100/000009.sst:
> > > > > > > > > No such file or directory
> > > > > > > > > stream-thread [StreamThread-3] Flushing state stores of
> task
> > > 0_27
> > > > > > > > > stream-thread [StreamThread-3] Flushing state stores of
> task
> > > 0_12
> > > > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> > > task
> > > > > 0_35
> > > > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> > > task
> > > > > 0_38
> > > > > > > > > stream-thread [StreamThread-3] Failed while executing
> > > StreamTask
> > > > > 0_38
> > > > > > > > duet
> > > > > > > > > to close state manager:
> > > > > > > > > org.apache.kafka.streams.errors.ProcessorStateException:
> > task
> > > > > [0_38]
> > > > > > > > Failed
> > > > > > > > > to close state store key-table
> > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > > > >
> > > > > > > > > /data/advice/kafka-streams/new-part-advice/0_38/key-
> > > > > > > > table/key-table-201702052100/000009.sst:
> > > > > > > > > No such file or directory
> > > > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> > > task
> > > > > 0_27
> > > > > > > > > stream-thread [StreamThread-3] Closing the state manager of
> > > task
> > > > > 0_12
> > > > > > > > > Closing the Kafka producer with timeoutMillis =
> > > > 9223372036854775807
> > > > > > ms.
> > > > > > > > > stream-thread [StreamThread-3] Removing all active tasks
> > > [[0_35,
> > > > > > 0_38,
> > > > > > > > > 0_27, 0_12]]
> > > > > > > > > stream-thread [StreamThread-3] Removing all standby tasks
> > [[]]
> > > > > > > > > stream-thread [StreamThread-3] Stream thread shutdown
> > complete
> > > > > > > > >
> > > > > > > > > It was working for iterations before that and then suddenly
> > > that
> > > > > > > dir/file
> > > > > > > > > was gone and it could not commit/flush/close the state.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > For second, what do you recommend. Is there something we
> can
> > > > patch
> > > > > > from
> > > > > > > > > 10.2 into 10.1.
> > > > > > > > > Certain commits or something.
> > > > > > > > >
> > > > > > > > > Or do we again need to upgrade the cluster or 10.2, or if
> we
> > > just
> > > > > > > upgrade
> > > > > > > > > streams client to 10.2 will it work fine?
> > > > > > > > > Since 10.2 is not released yet I suppose we would have
> build
> > > the
> > > > > > > snapshot
> > > > > > > > > version.
> > > > > > > > >
> > > > > > > > > Thanks
> > > > > > > > > Sachin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 6, 2017 at 3:58 PM, Damian Guy <
> > > damian....@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Sachin,
> > > > > > > > > >
> > > > > > > > > > The first exception - Is each instance of your streams
> app
> > > on a
> > > > > > > single
> > > > > > > > > > machine running with the same state directory config?
> > > > > > > > > > The second exception - i believe is a bug in 0.10.1 that
> > has
> > > > been
> > > > > > > fixed
> > > > > > > > > in
> > > > > > > > > > 0.10.2. There has been a number of issues fixed in this
> > area.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Damian
> > > > > > > > > >
> > > > > > > > > > On Mon, 6 Feb 2017 at 05:43 Sachin Mittal <
> > > sjmit...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hello All,
> > > > > > > > > > > We recently upgraded to  kafka_2.10-0.10.1.1.
> > > > > > > > > > >
> > > > > > > > > > > We have a source topic with replication = 3 and
> > partition =
> > > > 40.
> > > > > > > > > > > We have a streams application run with
> > > > > NUM_STREAM_THREADS_CONFIG
> > > > > > =
> > > > > > > 4
> > > > > > > > > and
> > > > > > > > > > on
> > > > > > > > > > > three machines. So 12 threads in total.
> > > > > > > > > > >
> > > > > > > > > > > What we do is start the same streams application one by
> > one
> > > > on
> > > > > > > three
> > > > > > > > > > > machines.
> > > > > > > > > > >
> > > > > > > > > > > After some time what we noticed was that one of the
> > machine
> > > > > > streams
> > > > > > > > > > > application just crashed. When we inspected the log
> here
> > is
> > > > > what
> > > > > > we
> > > > > > > > > > found.
> > > > > > > > > > > There were 2 sets of errors like:
> > > > > > > > > > >
> > > > > > > > > > > stream-thread [StreamThread-3] Failed to commit
> > StreamTask
> > > > 0_38
> > > > > > > > state:
> > > > > > > > > > > org.apache.kafka.streams.errors.
> ProcessorStateException:
> > > > task
> > > > > > > [0_38]
> > > > > > > > > > Failed
> > > > > > > > > > > to flush state store key-table
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:
> > 331)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > StreamTask.commit(StreamTask.java:267)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.co
> > > mmitOne(
> > > > > > > > > > StreamThread.java:576)
> > > > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > org.apache.kafka.streams.processor.internals.StreamThread.co
> > > mmitAll(
> > > > > > > > > > StreamThread.java:562)
> > > > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.maybeCommit(
> > > > > > > > > > StreamThread.java:538)
> > > > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.runLoop(
> > > > > > > > > > StreamThread.java:456)
> > > > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > > > > [kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > > Caused by: org.apache.kafka.streams.errors.
> > > > > > ProcessorStateException:
> > > > > > > > > > Error
> > > > > > > > > > > while executing flush from store key-table-201702052100
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > > > flushInternal(RocksDBStore.java:375)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > > > flush(RocksDBStore.java:366)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > RocksDBWindowStore.flush(
> > > > > > > > > > RocksDBWindowStore.java:256)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > MeteredWindowStore.flush(
> > > > > > > > > > MeteredWindowStore.java:116)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > CachingWindowStore.flush(
> > > > > > > > > > CachingWindowStore.java:119)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > ProcessorStateManager.flush(ProcessorStateManager.java:
> > 329)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     ... 6 common frames omitted
> > > > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error:
> > > > > > > > > > >
> > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > > > > > > > 201702052100/000009.sst:
> > > > > > > > > > > No such file or directory
> > > > > > > > > > >     at org.rocksdb.RocksDB.flush(Native Method)
> > > > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > > > >     at org.rocksdb.RocksDB.flush(RocksDB.java:1360)
> > > > > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > > > flushInternal(RocksDBStore.java:373)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     ... 11 common frames omitted
> > > > > > > > > > >
> > > > > > > > > > > When we queried the directory
> > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > > > 201702052100/
> > > > > > > > we
> > > > > > > > > > > could not find the directory.
> > > > > > > > > > > So looks like this directory was earlier deleted by
> some
> > > task
> > > > > and
> > > > > > > now
> > > > > > > > > > some
> > > > > > > > > > > other task is trying to flush it too.
> > > > > > > > > > > What could be possible the reason for the same?
> > > > > > > > > > >
> > > > > > > > > > > Another set of error we see is this:
> > > > > > > > > > > org.apache.kafka.streams.errors.StreamsException:
> > > > > stream-thread
> > > > > > > > > > > [StreamThread-1] Failed to rebalance
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.runLoop(
> > > > > > > > > > StreamThread.java:410)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > StreamThread.run(StreamThread.java:242)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > > Caused by: org.apache.kafka.streams.errors.
> > > > > > ProcessorStateException:
> > > > > > > > > > Error
> > > > > > > > > > > opening store key-table-201702052000 at location
> > > > > > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > > > 201702052000
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > > > openDB(RocksDBStore.java:190)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > > > openDB(RocksDBStore.java:159)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore$
> > > Segment.
> > > > > > > > > > openDB(RocksDBWindowStore.java:72)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state
> > > .internals.RocksDBWindowStore.
> > > > > > > > > > getOrCreateSegment(RocksDBWindowStore.java:388)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > org.apache.kafka.streams.state.internals.RocksDBWindowStore.
> > > > > putInternal(
> > > > > > > > > > RocksDBWindowStore.java:319)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > > > RocksDBWindowStore.access$000(
> > > > > > > > > > RocksDBWindowStore.java:51)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > > > RocksDBWindowStore$1.restore(
> > > > > > > > > > RocksDBWindowStore.java:206)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > ProcessorStateManager.
> > > > > > > > > > restoreActiveState(ProcessorStateManager.java:235)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > ProcessorStateManager.
> > > > > > > > > > register(ProcessorStateManager.java:198)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > ProcessorContextImpl.register(
> > ProcessorContextImpl.java:123)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > RocksDBWindowStore.init(
> > > > > > > > > > RocksDBWindowStore.java:200)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > MeteredWindowStore.init(
> > > > > > > > > > MeteredWindowStore.java:66)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.
> > > > > > CachingWindowStore.init(
> > > > > > > > > > CachingWindowStore.java:64)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > AbstractTask.
> > > > > > > > > > initializeStateStores(AbstractTask.java:81)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > StreamTask.<init>(StreamTask.java:119)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > StreamThread.createStreamTask(StreamThread.java:633)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > > > StreamThread.addStreamTasks(StreamThread.java:660)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > > > StreamThread.access$100(
> > > > > > > > > > StreamThread.java:69)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.proce
> > > ssor.internals.StreamThread$1.
> > > > > > > > > > onPartitionsAssigned(StreamThread.java:124)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > > > ConsumerCoordinator.
> > > > > > > > > > onJoinComplete(ConsumerCoordinator.java:228)
> > > > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > > > AbstractCoordinator.
> > > > > > > > > > joinGroupIfNeeded(AbstractCoordinator.java:313)
> > > > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > > > AbstractCoordinator.
> > > > > > > > > > ensureActiveGroup(AbstractCoordinator.java:277)
> > > > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.clients.consumer.internals.
> > > > > > > > ConsumerCoordinator.poll(
> > > > > > > > > > ConsumerCoordinator.java:259)
> > > > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.
> > > > > > > > > > pollOnce(KafkaConsumer.java:1013)
> > > > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > > > > > > > > KafkaConsumer.java:979)
> > > > > > > > > > > ~[kafka-clients-0.10.1.1.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.processor.internals.
> > > > > > StreamThread.runLoop(
> > > > > > > > > > StreamThread.java:407)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     ... 1 common frames omitted
> > > > > > > > > > > Caused by: org.rocksdb.RocksDBException: IO error: lock
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > /data/kafka-streams/new-part/0_38/key-table/key-table-
> > > > > 201702052000/LOCK:
> > > > > > > > > > No
> > > > > > > > > > > locks available
> > > > > > > > > > >     at org.rocksdb.RocksDB.open(Native Method)
> > > > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > > > >     at org.rocksdb.RocksDB.open(RocksDB.java:184)
> > > > > > > > > > > ~[rocksdbjni-4.9.0.jar:na]
> > > > > > > > > > >     at
> > > > > > > > > > >
> > > > > > > > > > > org.apache.kafka.streams.state.internals.RocksDBStore.
> > > > > > > > > > openDB(RocksDBStore.java:183)
> > > > > > > > > > > ~[kafka-streams-0.10.1.1.jar:na]
> > > > > > > > > > >     ... 26 common frames omitted
> > > > > > > > > > >
> > > > > > > > > > > And then the whole application shuts down. We have not
> > > > > understood
> > > > > > > > this
> > > > > > > > > > part
> > > > > > > > > > > of the error and if this second error is somehow
> related
> > to
> > > > > first
> > > > > > > > > error.
> > > > > > > > > > >
> > > > > > > > > > > Please let us know what could be the cause of these
> > errors
> > > or
> > > > > if
> > > > > > > they
> > > > > > > > > > have
> > > > > > > > > > > been fixed and if there is some way to fix this in
> > current
> > > > > > release.
> > > > > > > > > > >
> > > > > > > > > > > Thanks
> > > > > > > > > > > Sachin
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to