Hi Ian,

As discussed in KAFKA-3775, proper memory management is better than throttling 
and we've made some steps towards that in 0.10.1 and 0.10.2 (reduce the memory 
RocksDb uses, provide a global memory limit for buffers within streams).

The scenario you mention is possible, and needs to be addressed in the context 
of fault tolerance for your app. If an instance fails you still have the option 
of detecting that and starting a new instance. Alternatively you can use 
standby tasks (let me know if it's not clear how to use them). It might be 
better to start from the beginning with a number of standby tasks, say over 3 
instances.

Thanks
Eno


> On 27 Feb 2017, at 17:17, Ian Duffy <i...@ianduffy.ie> wrote:
> 
>> Yes, the partitions reflect those of the input topic. You could try to
> create the topic manually before streams start, however, that might not be
> an ideal operational way of doing things (it's best if streams continues to
> do these things automatically). I'd suggest the scaling out approach first.
> 
> One last question for you. If I scale out adding more instances I
> understand that I won't have as much stores on my disk as they are going to
> be assigned to the other instances and evenly distributed. However, in the
> event, I scale down or some of those instances goes into a bad state and
> stops processing the remaining healthy instances will be assigned to their
> topics. Is it possible to throttle the amount of topics a stream subscribes
> to?
> 
> I see https://issues.apache.org/jira/browse/KAFKA-3775 which seems to be
> along these lines.
> 
> Thanks again,
> Ian.
> 
> On 27 February 2017 at 15:41, Eno Thereska <eno.there...@gmail.com> wrote:
> 
>> Hi Ian,
>> 
>> Yes, the partitions reflect those of the input topic. You could try to
>> create the topic manually before streams start, however, that might not be
>> an ideal operational way of doing things (it's best if streams continues to
>> do these things automatically). I'd suggest the scaling out approach first.
>> 
>> I think the info you already have would be sufficient in creating the JIRA
>> for now (i.e., feel free to copy paste your email)
>> 
>> Thanks
>> Eno
>> 
>>> On 27 Feb 2017, at 14:53, Ian Duffy <i...@ianduffy.ie> wrote:
>>> 
>>> Hi Eno,
>>> 
>>> Thanks for the fast response.
>>> 
>>>> It looks like you have a lot of partitions for the count store.
>>> 
>>> I believe this isn't configurable? They were auto created by the stream.
>>> I'm assuming its mirrored based of the amount off partitions our input
>>> topic has.
>>> 
>>>> The locking part was supposed to have been fixed in 0.10.2, however, it
>>> seems like there are still cases when it happens. Could you confirm that
>>> you are using the latest release 0.10.2 (that just came out last week)?
>>> 
>>> We are indeed using this client. We've been using it since the RC1 vote.
>>> We've seen an improvement since the 0.10.1.1 client.
>>> Any specifics I can include within the JIRA ticket to help with getting
>> to
>>> the bottom of it? I can't seem to pinpoint a specific trigger for them.
>>> Sometimes the application can run for hours/days before hitting them.
>>> 
>>> On 27 February 2017 at 14:17, Eno Thereska <eno.there...@gmail.com>
>> wrote:
>>> 
>>>> Hi Ian,
>>>> 
>>>> It looks like you have a lot of partitions for the count store. Each
>>>> RocksDb database uses off heap memory (around 60-70MB in 0.10.2) which
>> will
>>>> add up if you have these many stores in one instance. One solution
>> would be
>>>> to scale out your streams application by using another Kafka Streams
>>>> instance on another machine. That would explain the running out of
>> memory
>>>> part.
>>>> 
>>>> The locking part was supposed to have been fixed in 0.10.2, however it
>>>> seems like there are still cases when it happens. Could you confirm that
>>>> you are using the latest release 0.10.2 (that just came out last week)?
>>>> Just double-checking before re-opening the JIRA. As a work around, try
>>>> setting num.stream.threads to 1 and run instead multiple
>>>> instances/processes on the same machine.
>>>> 
>>>> Thanks
>>>> Eno
>>>> 
>>>>> On 27 Feb 2017, at 14:04, Ian Duffy <i...@ianduffy.ie> wrote:
>>>>> 
>>>>> Hi All,
>>>>> 
>>>>> I'm using Kafka Client 10.2 with Kafka Streams.
>>>>> 
>>>>> I'm performing a groupByKey on a stream and seeing large files appear
>>>>> within my state directory. Is this expected?
>>>>> 
>>>>> 90M 1_0/rocksdb/content-count-store
>>>>> 82M 1_1/rocksdb/content-count-store
>>>>> 102M 1_10/rocksdb/content-count-store
>>>>> 86M 1_11/rocksdb/content-count-store
>>>>> 87M 1_12/rocksdb/content-count-store
>>>>> 85M 1_13/rocksdb/content-count-store
>>>>> 93M 1_14/rocksdb/content-count-store
>>>>> 87M 1_15/rocksdb/content-count-store
>>>>> 92M 1_16/rocksdb/content-count-store
>>>>> 97M 1_17/rocksdb/content-count-store
>>>>> 91M 1_18/rocksdb/content-count-store
>>>>> 94M 1_19/rocksdb/content-count-store
>>>>> 89M 1_2/rocksdb/content-count-store
>>>>> 88M 1_20/rocksdb/content-count-store
>>>>> 92M 1_21/rocksdb/content-count-store
>>>>> 83M 1_22/rocksdb/content-count-store
>>>>> 82M 1_23/rocksdb/content-count-store
>>>>> 83M 1_24/rocksdb/content-count-store
>>>>> 89M 1_25/rocksdb/content-count-store
>>>>> 91M 1_26/rocksdb/content-count-store
>>>>> 84M 1_27/rocksdb/content-count-store
>>>>> 87M 1_28/rocksdb/content-count-store
>>>>> 93M 1_29/rocksdb/content-count-store
>>>>> 88M 1_3/rocksdb/content-count-store
>>>>> 77M 1_30/rocksdb/content-count-store
>>>>> 101M 1_31/rocksdb/content-count-store
>>>>> 73M 1_32/rocksdb/content-count-store
>>>>> 89M 1_33/rocksdb/content-count-store
>>>>> 89M 1_34/rocksdb/content-count-store
>>>>> 82M 1_35/rocksdb/content-count-store
>>>>> 88M 1_36/rocksdb/content-count-store
>>>>> 82M 1_37/rocksdb/content-count-store
>>>>> 83M 1_38/rocksdb/content-count-store
>>>>> 92M 1_39/rocksdb/content-count-store
>>>>> 99M 1_4/rocksdb/content-count-store
>>>>> 88M 1_40/rocksdb/content-count-store
>>>>> 89M 1_41/rocksdb/content-count-store
>>>>> 84M 1_42/rocksdb/content-count-store
>>>>> 88M 1_43/rocksdb/content-count-store
>>>>> 91M 1_44/rocksdb/content-count-store
>>>>> 90M 1_45/rocksdb/content-count-store
>>>>> 81M 1_46/rocksdb/content-count-store
>>>>> 89M 1_47/rocksdb/content-count-store
>>>>> 81M 1_48/rocksdb/content-count-store
>>>>> 81M 1_49/rocksdb/content-count-store
>>>>> 82M 1_5/rocksdb/content-count-store
>>>>> 88M 1_50/rocksdb/content-count-store
>>>>> 88M 1_51/rocksdb/content-count-store
>>>>> 75M 1_52/rocksdb/content-count-store
>>>>> 85M 1_53/rocksdb/content-count-store
>>>>> 72M 1_54/rocksdb/content-count-store
>>>>> 89M 1_55/rocksdb/content-count-store
>>>>> 86M 1_56/rocksdb/content-count-store
>>>>> 87M 1_57/rocksdb/content-count-store
>>>>> 87M 1_58/rocksdb/content-count-store
>>>>> 94M 1_59/rocksdb/content-count-store
>>>>> 83M 1_6/rocksdb/content-count-store
>>>>> 88M 1_60/rocksdb/content-count-store
>>>>> 87M 1_61/rocksdb/content-count-store
>>>>> 102M 1_62/rocksdb/content-count-store
>>>>> 86M 1_63/rocksdb/content-count-store
>>>>> 85M 1_64/rocksdb/content-count-store
>>>>> 91M 1_65/rocksdb/content-count-store
>>>>> 86M 1_66/rocksdb/content-count-store
>>>>> 82M 1_67/rocksdb/content-count-store
>>>>> 85M 1_68/rocksdb/content-count-store
>>>>> 85M 1_69/rocksdb/content-count-store
>>>>> 87M 1_7/rocksdb/content-count-store
>>>>> 83M 1_70/rocksdb/content-count-store
>>>>> 84M 1_71/rocksdb/content-count-store
>>>>> 89M 1_72/rocksdb/content-count-store
>>>>> 82M 1_73/rocksdb/content-count-store
>>>>> 84M 1_74/rocksdb/content-count-store
>>>>> 86M 1_75/rocksdb/content-count-store
>>>>> 92M 1_76/rocksdb/content-count-store
>>>>> 85M 1_77/rocksdb/content-count-store
>>>>> 92M 1_78/rocksdb/content-count-store
>>>>> 84M 1_79/rocksdb/content-count-store
>>>>> 109M 1_8/rocksdb/content-count-store
>>>>> 99M 1_80/rocksdb/content-count-store
>>>>> 88M 1_81/rocksdb/content-count-store
>>>>> 103M 1_82/rocksdb/content-count-store
>>>>> 95M 1_83/rocksdb/content-count-store
>>>>> 89M 1_84/rocksdb/content-count-store
>>>>> 93M 1_85/rocksdb/content-count-store
>>>>> 84M 1_86/rocksdb/content-count-store
>>>>> 89M 1_87/rocksdb/content-count-store
>>>>> 95M 1_88/rocksdb/content-count-store
>>>>> 87M 1_89/rocksdb/content-count-store
>>>>> 89M 1_9/rocksdb/content-count-store
>>>>> 87M 1_90/rocksdb/content-count-store
>>>>> 100M 1_91/rocksdb/content-count-store
>>>>> 93M 1_92/rocksdb/content-count-store
>>>>> 93M 1_93/rocksdb/content-count-store
>>>>> 88M 1_94/rocksdb/content-count-store
>>>>> 88M 1_95/rocksdb/content-count-store
>>>>> 
>>>>> When the application starts up it goes through the joining process, it
>>>>> takes a couple of minutes and pulls down this
>>>>> 8gb of data into the state directory. After that, the application
>> starts
>>>>> processing, sometimes it will give an exception
>>>>> like org.apache.kafka.streams.errors.LockException: task [1_57] Failed
>>>> to
>>>>> lock the state directory: /data/kafka-streams/stream-
>> processors-id/1_57
>>>>> 
>>>>> Other times it runs successfully and eventually throws a garbage
>>>> collection
>>>>> error:
>>>>> 
>>>>> 2017-02-27 11:57:55,894 - [ERROR] - [StreamThread-3]
>>>>> i.z.a.s.processors.RunProcessors - Unexpected Exception caught in
>> thread
>>>>> [StreamThread-3]:
>>>>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>>>>> #011at java.util.Arrays.copyOf(Arrays.java:3332)
>>>>> #011at
>>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(
>>>> AbstractStringBuilder.java:124)
>>>>> 
>>>>> #011at
>>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>>>>> #011at java.lang.StringBuilder.append(StringBuilder.java:136)
>>>>> #011at
>>>>> org.apache.kafka.common.metrics.JmxReporter.
>>>> getMBeanName(JmxReporter.java:132)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.common.metrics.JmxReporter.
>> removeAttribute(JmxReporter.
>>>> java:96)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.common.metrics.JmxReporter.metricRemoval(JmxReporter.
>>>> java:84)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.common.metrics.Metrics.removeMetric(Metrics.java:417)
>>>>> #011at
>>>>> org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:367)
>>>>> #011at
>>>>> org.apache.kafka.common.metrics.Metrics.removeSensor(Metrics.java:375)
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
>>>> removeSensor(StreamsMetricsImpl.java:205)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.
>> ProcessorNode$NodeMetrics.
>>>> removeAllSensors(ProcessorNode.java:199)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.ProcessorNode.close(
>>>> ProcessorNode.java:123)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.StreamTask.closeTopology(
>>>> StreamTask.java:355)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamTask.close(StreamTask.java:376)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread$5.apply(
>>>> StreamThread.java:1046)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>>>> performOnAllTasks(StreamThread.java:503)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.
>> StreamThread.closeAllTasks(
>>>> StreamThread.java:1042)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.
>>>> shutdownTasksAndState(StreamThread.java:451)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.StreamThread.shutdown(
>>>> StreamThread.java:398)
>>>>> 
>>>>> #011at
>>>>> org.apache.kafka.streams.processor.internals.
>>>> StreamThread.run(StreamThread.java:379)
>>>>> 
>>>>> 
>>>>> I've set the following Java Options to try avoid this:
>>>>> 
>>>>> "-Xms1024m"
>>>>> , "-Xmx6144m"
>>>>> , "-XX:+UseConcMarkSweepGC"
>>>>> , "-XX:+DisableExplicitGC"
>>>>> , "-XX:+CMSClassUnloadingEnabled"
>>>>> 
>>>>> The streams are running with the following properties
>>>>> 
>>>>> streams {
>>>>> client.id = "stream-processors-id"
>>>>> application.id = "stream-processors-id"
>>>>> bootstrap.servers = "localhost:9092"
>>>>> replication.factor = "3"
>>>>> num.stream.threads = "4"
>>>>> state.dir = "/tmp/kafka-streams"
>>>>> buffered.records.per.partition = "100"
>>>>> poll.ms = "100"
>>>>> cache.max.bytes.buffering = 10485760
>>>>> commit.interval.ms = "30000"
>>>>> 
>>>>> consumer {
>>>>>  group.id = "stream-processors-id"
>>>>>  auto.offset.reset = "earliest"
>>>>>  max.poll.records = "100"
>>>>>  session.timeout.ms = "90000"
>>>>>  heartbeat.interval.ms = "40000"
>>>>>  commit.interval.ms = "30000"
>>>>>  max.poll.interval.ms = "300000"
>>>>>  max.partition.fetch.bytes = "524288"
>>>>> }
>>>>> }
>>>>> 
>>>>> Thanks,
>>>>> Ian.
>>>> 
>>>> 
>> 
>> 

Reply via email to