Re: Suppress DSL operator in stream api - 2.4.0

2019-09-25 Thread Thameem Ansari
Tried your suggestions and unable to get suppress emit anything. I can see the 
SUPPRESS_STORES are created in Kafka nodes but nothing get outputted. 
Looks like the grace period and window closing is not honored for some reason. 
I can see lot of people having difficulty of getting suppress working. 
My window time is one minute and I tried with and without grace period. I can 
see the event time is in the past as I am feeding the test data but even if I 
post the data with progressive event times in an order nothing happens. 
Any help is appreciated. 

Thanks. 

> On Sep 11, 2019, at 10:50 PM, Alessandro Tagliapietra 
>  wrote:
> 
> Have you tried deleting the suppress changelog topic to see if you get
> something after deleting it?
> 
> By per topic and not per key I mean that if you send for example an event
> with timestamp equal to today's date with key 1 and that closes today's
> window and data in the past with key 2 won't go through
> 
> On Wed, Sep 11, 2019, 8:45 PM Thameem Ansari  wrote:
> 
>> I tried with different timestamps in the near past but nothing coming out.
>> I went thru the article from Confluent about using the suppress but I don’t
>> see many people are successful with that.
>> 
>> What do you mean by “timestamp is per topic and not per key”. Can you
>> please elaborate?
>> 
>> 
>> 
>> 
>>> On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>> 
>>> Did you ever push any data with a greater timestamp than the current one
>>> you're producing?
>>> One thing took me a while to find out is that the suppress timestamp is
>> per
>>> topic and not per key
>>> 
>>> --
>>> Alessandro Tagliapietra
>>> 
>>> 
>>> On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari 
>> wrote:
>>> 
>>>> Yes I am able to see the output when I remove suppress.
>>>> 
>>>> 
>>>>> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax 
>>>> wrote:
>>>>> 
>>>>> Hard to say. Do you see output if you remove `suppress()` from your
>>>>> topology?
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> On 9/11/19 6:19 PM, Thameem Ansari wrote:
>>>>>> I am using a producer simulator to simulate the events in the past and
>>>> I can see my time advances and the topology is based on the event time.
>> But
>>>> even if I run the producer for few hours nothing get emitted. Is there
>>>> anyway to debug this issue?
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax 
>>>> wrote:
>>>>>>> 
>>>>>>> Note that `suppress()` is event time based, and does not emit any
>> data
>>>>>>> if event time does not advance.
>>>>>>> 
>>>>>>> A common miss understanding is, that people stop to send data and
>>>> expect
>>>>>>> to see a result after some time, but that is not how it works. If you
>>>>>>> stop sending data, event time cannot advance and thus emit will never
>>>>>>> send anything downstream.
>>>>>>> 
>>>>>>> Also see this blog post about `suppress`:
>>>>>>> 
>>>> 
>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
>>>>>>> 
>>>>>>> 
>>>>>>> -Matthias
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
>>>>>>>> In my streaming topology, I am using the suppress dsl operator. As
>>>> per the documentation, it is supposed to output the final results after
>> the
>>>> window closes. But I noticed it's not emitting anything at all. Here is
>> the
>>>> pseudo code of my topology.
>>>>>>>> 
>>>>>>>> .filter((key, value) -> ...)
>>>>>>>> .flatMap((key, value) -> {
>>>>>>>> ...
>>>>>>>> })
>>>>>>>> .groupByKey(Grouped.with(Serdes.String(), ...))
>>>>>>>> 
>>>> 
>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
>>>>>>>> .aggregate(
>>>>>>>>   ...
>>>>>>>> 
>>>> 
>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>>>> 
>>>>>>>> Anything wrong here??
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> Thameem
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>> 
>> 



Re: log.retention.bytes not working

2019-09-18 Thread Thameem Ansari
To reliably delete the logs you have to follow this. 

In Kafka server.properties set the following properties (adjust per your need)
log.segment.bytes=10485760
log.retention.check.interval.ms=12
log.retention.ms=60

I believe the documentation should be clear enough to explain the priority of 
these parameters. 

Thanks. 

> On Sep 18, 2019, at 10:42 AM, Vinay Kumar  wrote:
> 
> Yes, log.cleanup.policy is set to delete.
> 
> On Wed, Sep 18, 2019, 15:12 M. Manna  wrote:
> 
>> And is your log.cleanup.policy set to delete ?
>> 
>> On Wed, 18 Sep 2019 at 06:19, Vinay Kumar  wrote:
>> 
>>> I don't see log.retention.bytes only is not working. Even after the
>>> specified size in log.retention.bytes reached, the topic partition with
>>> segments grows much beyond.
>>> 
>>> On Wed, Sep 18, 2019, 1:31 AM M. Manna  wrote:
>>> 
 Hi,
 
 This is a very old post. Kafka has come far.
 
 Are you saying that log.retention.bytes (broker leave - all topics)
>>> and/or
 retention.bytes (topic level - individual topic) isn’t working for you
>> ?
 
 Thanks,
 
 On Tue, 17 Sep 2019 at 20:02, Vinay Kumar 
>> wrote:
 
> Hi,
> I have the same problem mentioned here by Neha. Attached the comment.
> 
> 
> 
 
>>> 
>> https://grokbase.com/t/kafka/users/138hje002v/questtion-about-log-retention-bytes
> 
> Can someone please provide how effectively to impose the retention
>> for
 all
> topics.
> 
 
>>> 
>> 



Re: Suppress DSL operator in stream api - 2.4.0

2019-09-11 Thread Thameem Ansari
I tried with different timestamps in the near past but nothing coming out. I 
went thru the article from Confluent about using the suppress but I don’t see 
many people are successful with that. 

What do you mean by “timestamp is per topic and not per key”. Can you please 
elaborate? 




> On Sep 11, 2019, at 10:13 PM, Alessandro Tagliapietra 
>  wrote:
> 
> Did you ever push any data with a greater timestamp than the current one
> you're producing?
> One thing took me a while to find out is that the suppress timestamp is per
> topic and not per key
> 
> --
> Alessandro Tagliapietra
> 
> 
> On Wed, Sep 11, 2019 at 8:06 PM Thameem Ansari  wrote:
> 
>> Yes I am able to see the output when I remove suppress.
>> 
>> 
>>> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax 
>> wrote:
>>> 
>>> Hard to say. Do you see output if you remove `suppress()` from your
>>> topology?
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 9/11/19 6:19 PM, Thameem Ansari wrote:
>>>> I am using a producer simulator to simulate the events in the past and
>> I can see my time advances and the topology is based on the event time. But
>> even if I run the producer for few hours nothing get emitted. Is there
>> anyway to debug this issue?
>>>> 
>>>> 
>>>> 
>>>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax 
>> wrote:
>>>>> 
>>>>> Note that `suppress()` is event time based, and does not emit any data
>>>>> if event time does not advance.
>>>>> 
>>>>> A common miss understanding is, that people stop to send data and
>> expect
>>>>> to see a result after some time, but that is not how it works. If you
>>>>> stop sending data, event time cannot advance and thus emit will never
>>>>> send anything downstream.
>>>>> 
>>>>> Also see this blog post about `suppress`:
>>>>> 
>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> 
>>>>> 
>>>>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
>>>>>> In my streaming topology, I am using the suppress dsl operator. As
>> per the documentation, it is supposed to output the final results after the
>> window closes. But I noticed it's not emitting anything at all. Here is the
>> pseudo code of my topology.
>>>>>> 
>>>>>> .filter((key, value) -> ...)
>>>>>> .flatMap((key, value) -> {
>>>>>>  ...
>>>>>> })
>>>>>> .groupByKey(Grouped.with(Serdes.String(), ...))
>>>>>> 
>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
>>>>>> .aggregate(
>>>>>>...
>>>>>> 
>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>> 
>>>>>> Anything wrong here??
>>>>>> 
>>>>>> Thanks
>>>>>> Thameem
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 



Re: Suppress DSL operator in stream api - 2.4.0

2019-09-11 Thread Thameem Ansari
Yes I am able to see the output when I remove suppress. 


> On Sep 11, 2019, at 9:58 PM, Matthias J. Sax  wrote:
> 
> Hard to say. Do you see output if you remove `suppress()` from your
> topology?
> 
> -Matthias
> 
> 
> On 9/11/19 6:19 PM, Thameem Ansari wrote:
>> I am using a producer simulator to simulate the events in the past and I can 
>> see my time advances and the topology is based on the event time. But even 
>> if I run the producer for few hours nothing get emitted. Is there anyway to 
>> debug this issue?
>> 
>> 
>> 
>>> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax  wrote:
>>> 
>>> Note that `suppress()` is event time based, and does not emit any data
>>> if event time does not advance.
>>> 
>>> A common miss understanding is, that people stop to send data and expect
>>> to see a result after some time, but that is not how it works. If you
>>> stop sending data, event time cannot advance and thus emit will never
>>> send anything downstream.
>>> 
>>> Also see this blog post about `suppress`:
>>> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> 
>>> On 9/10/19 9:52 PM, Thameem Ansari wrote:
>>>> In my streaming topology, I am using the suppress dsl operator. As per the 
>>>> documentation, it is supposed to output the final results after the window 
>>>> closes. But I noticed it's not emitting anything at all. Here is the 
>>>> pseudo code of my topology. 
>>>> 
>>>> .filter((key, value) -> ...)
>>>> .flatMap((key, value) -> {
>>>>   ...
>>>> })
>>>> .groupByKey(Grouped.with(Serdes.String(), ...))
>>>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
>>>> .aggregate(
>>>> ...
>>>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>>>>  
>>>> Anything wrong here??
>>>> 
>>>> Thanks
>>>> Thameem
>>>> 
>>> 
>> 
>> 
> 



Re: Suppress DSL operator in stream api - 2.4.0

2019-09-11 Thread Thameem Ansari
I am using a producer simulator to simulate the events in the past and I can 
see my time advances and the topology is based on the event time. But even if I 
run the producer for few hours nothing get emitted. Is there anyway to debug 
this issue?



> On Sep 11, 2019, at 6:13 PM, Matthias J. Sax  wrote:
> 
> Note that `suppress()` is event time based, and does not emit any data
> if event time does not advance.
> 
> A common miss understanding is, that people stop to send data and expect
> to see a result after some time, but that is not how it works. If you
> stop sending data, event time cannot advance and thus emit will never
> send anything downstream.
> 
> Also see this blog post about `suppress`:
> https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers
> 
> 
> -Matthias
> 
> 
> 
> On 9/10/19 9:52 PM, Thameem Ansari wrote:
>> In my streaming topology, I am using the suppress dsl operator. As per the 
>> documentation, it is supposed to output the final results after the window 
>> closes. But I noticed it's not emitting anything at all. Here is the pseudo 
>> code of my topology. 
>> 
>> .filter((key, value) -> ...)
>> .flatMap((key, value) -> {
>>...
>> })
>> .groupByKey(Grouped.with(Serdes.String(), ...))
>> .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
>> .aggregate(
>>  ...
>> ).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>>  
>> Anything wrong here??
>> 
>> Thanks
>> Thameem
>> 
> 



Suppress DSL operator in stream api - 2.4.0

2019-09-10 Thread Thameem Ansari
In my streaming topology, I am using the suppress dsl operator. As per the 
documentation, it is supposed to output the final results after the window 
closes. But I noticed it's not emitting anything at all. Here is the pseudo 
code of my topology. 

.filter((key, value) -> ...)
.flatMap((key, value) -> {
...
})
.groupByKey(Grouped.with(Serdes.String(), ...))
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
.aggregate(
  ...
).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())); 
Anything wrong here??

Thanks
Thameem

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-12 Thread Thameem Ansari
Actually in our topology 394 different counters get set and with 8 partitions 
it totals to 3152 tasks. 
Btw, just to give you an update. I was able to run the stream application in 2 
separate instances in 2 VMs and it is working fine. As I suspected in my 
previous email rocksdb wasn’t able to work properly when there are too many 
tasks from single streaming application instance or we are missing some 
configuration to handle it with single instance. 
After few more tests next week I can conclude my findings. 


Sent from my iPhone

> On Jul 12, 2019, at 9:10 PM, Bill Bejeck  wrote:
> 
> Hi Thameem,
> 
>> Our topology has 3100 tasks with 3 brokers and 8 partitions.
> 
> I have a question that is somewhat orthogonal to your original item.  I may
> be missing something, but are you saying your streams application has 3,100
> tasks from 8 input partitions? If that is the case, do you mind sharing
> your topology? It seems like you have several repartition operations and
> you may be able to reduce the number of repartition operations depending on
> what you are doing.
> 
> Thanks,
> Bill
> 
>> On Thu, Jul 11, 2019 at 11:24 PM Thameem Ansari  wrote:
>> 
>> Open files in the system was set to 10Million and users limit is 1million.
>> When the process was active i was closely watching the open files and it
>> was around 400K so its well within the set limit. Rocksdb open files we
>> tried setting 100, 50, -1 but the results are same.
>> I am using Rocksdb config setter and not options (not sure what it is)
>> file.
>> 
>> Quick question: Our topology has 3100 tasks with 3 brokers and 8
>> partitions. But we are using only one streaming application instance with
>> single thread. Does this cause any issue? Today i am going to try with 2 or
>> 3 instances and see whether we can get rid of the issue.
>> 
>> Thanks
>> Thameem
>> 
>> 
>>> On Jul 12, 2019, at 1:57 AM, Sophie Blee-Goldman 
>> wrote:
>>> 
>>> I believe the "resource temporarily unavailable" actually is related to
>> the
>>> open files, most likely you are hitting the total file descriptor limit.
>>> Sorry if you mentioned this and I missed it, but what was the
>>> max.open.files in your RocksDBConfigSetter when you ran this? Actually,
>>> could you just include your full RocksDBConfigSetter implementation?
>>> 
>>> Not sure why Rocks would be spawning so many child processes...it does
>> use
>>> background threads for compaction but this sounds like something totally
>>> different. I also notice a read "This is a RocksDB options file" -- are
>> you
>>> trying to use an options file vs setting the options with the config
>> setter
>>> (or otherwise using Rocksdb outside of Streams?)
>>> 
>>> Have you tried cleaning up the state between runs? Are you using
>>> interactive queries?
>>> 
>>> Regarding the .checkpoint file not found thing -- this is an annoying but
>>> pretty much harmless bug we only recently figured out. There's an open PR
>>> for it but for now you can just ignore the warning. See KAFKA-5998
>>> <https://issues.apache.org/jira/browse/KAFKA-5998 <
>> https://issues.apache.org/jira/browse/KAFKA-5998>> for a long history (but
>>> scroll to the bottom for the actual explanation)
>>> 
>>> 
>>> On Thu, Jul 4, 2019 at 11:02 AM Thameem Ansari > <mailto:thame...@gmail.com>> wrote:
>>> 
>>>> i have few more details to share from today’s testing.
>>>> 
>>>> Attached strace to the process and noticed that there are thousands of
>>>> short lived child processes have been created by the stream application.
>>>> Not sure whether rocksdb is playing any role here. Noticed more than
>> 73000
>>>> child processes were created and this is after i increased the default
>>>> linux system limit from 32767 to 4million per PID. Appreciate if someone
>>>> answer the following questions.
>>>> 1. Why rocksdb creates these many processes? Each process is trying to
>>>> read the contents of the file and getting EAGAIN (Resource temporarily
>>>> unavailable)
>>>> 2. Noticed very high number of .checkpoint files missing and hence “No
>>>> such file or directory” message
>>>> 3. The child processes were created using “clone” system call. Not sure
>>>> whether there is any significance of this but wanna mention. Moreover
>> these
>>>> child processes were keep on increasing when the load is gettin

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-11 Thread Thameem Ansari
Open files in the system was set to 10Million and users limit is 1million. When 
the process was active i was closely watching the open files and it was around 
400K so its well within the set limit. Rocksdb open files we tried setting 100, 
50, -1 but the results are same. 
I am using Rocksdb config setter and not options (not sure what it is) file. 

Quick question: Our topology has 3100 tasks with 3 brokers and 8 partitions. 
But we are using only one streaming application instance with single thread. 
Does this cause any issue? Today i am going to try with 2 or 3 instances and 
see whether we can get rid of the issue. 

Thanks
Thameem


> On Jul 12, 2019, at 1:57 AM, Sophie Blee-Goldman  wrote:
> 
> I believe the "resource temporarily unavailable" actually is related to the
> open files, most likely you are hitting the total file descriptor limit.
> Sorry if you mentioned this and I missed it, but what was the
> max.open.files in your RocksDBConfigSetter when you ran this? Actually,
> could you just include your full RocksDBConfigSetter implementation?
> 
> Not sure why Rocks would be spawning so many child processes...it does use
> background threads for compaction but this sounds like something totally
> different. I also notice a read "This is a RocksDB options file" -- are you
> trying to use an options file vs setting the options with the config setter
> (or otherwise using Rocksdb outside of Streams?)
> 
> Have you tried cleaning up the state between runs? Are you using
> interactive queries?
> 
> Regarding the .checkpoint file not found thing -- this is an annoying but
> pretty much harmless bug we only recently figured out. There's an open PR
> for it but for now you can just ignore the warning. See KAFKA-5998
> <https://issues.apache.org/jira/browse/KAFKA-5998 
> <https://issues.apache.org/jira/browse/KAFKA-5998>> for a long history (but
> scroll to the bottom for the actual explanation)
> 
> 
> On Thu, Jul 4, 2019 at 11:02 AM Thameem Ansari  <mailto:thame...@gmail.com>> wrote:
> 
>> i have few more details to share from today’s testing.
>> 
>> Attached strace to the process and noticed that there are thousands of
>> short lived child processes have been created by the stream application.
>> Not sure whether rocksdb is playing any role here. Noticed more than 73000
>> child processes were created and this is after i increased the default
>> linux system limit from 32767 to 4million per PID. Appreciate if someone
>> answer the following questions.
>> 1. Why rocksdb creates these many processes? Each process is trying to
>> read the contents of the file and getting EAGAIN (Resource temporarily
>> unavailable)
>> 2. Noticed very high number of .checkpoint files missing and hence “No
>> such file or directory” message
>> 3. The child processes were created using “clone” system call. Not sure
>> whether there is any significance of this but wanna mention. Moreover these
>> child processes were keep on increasing when the load is getting applied
>> continuously. Not sure under what condition it will hit the ceiling.
>> 4. Noticed very high VIRT memory usage (around 31G) but RES memory was
>> only 1.9G. Isn’t every file open/close consume memory? But it is not going
>> up even though the number of processes is increased.
>> 
>> Thanks
>> Thameem
>> 
>>> On Jul 4, 2019, at 9:28 AM, Thameem Ansari  wrote:
>>> 
>>> Sorry for reposting the previous message as the images didn’t come
>> thru.. pasting as text.
>>> 
>>> I have changed both system and user limits.
>>> 
>>> To completely isolate the problem, I have tried the application in
>> Centos 7 environment. Set the ulimit to 1million and system limits to
>> 10million open files. Now 3 kafka nodes are running in separate servers
>> each and streaming application is running in a dedicated VM. Now the
>> application is not explicitly throwing “too many open files” error but
>> automatically aborted with the message
>>> 
>>> terminate called after throwing an instance of 'std::system_error'
>>>  what():  Resource temporarily unavailable
>>> ./bin/start.sh: line 42:  1100 Aborted
>>> 
>>> Here is the last few lines from strace output which shows the aborted
>> message.
>>> 
>>> 25721 14:58:35
>> open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/MANIFEST-07&qu

Re: Group Coordinator stuck on PrepareRebalance state.

2019-07-11 Thread Thameem Ansari
In addition to the session timeout try increasing the request timeout as well. 
We had similar issue and resolved it by increasing the timeouts. As per my 
understanding, If you have complex topology then it will take some time for 
kafka brokers to create the tasks and assign them to consumers. In the mean 
time if any consumer try to join the group it will timeout due to lower timeout 
values. Increasing the timeouts will give enough time for the brokers to assign 
the tasks to consumers properly. 

request.timeout.ms=6

Thanks
Thameem


> On Jul 11, 2019, at 7:56 PM, Aravind Dongara  
> wrote:
> 
> Hi Boyang,
> 
> Thanks for the quick response.
> We are on version 2.2.0.
> 
> We are using the following properties on KStreams/consumer:
> session.timeout.ms=15000
> heartbeat.interval.ms=3000
> 
> I was wondering if a member might leak if it satisfies “shouldKeepAlive” 
> condition in "onExpireHeartbeat" and the consumer restarts or goes down right 
> after that, before next heartbeat is sent.
> 
> Thanks
> Aravind
> 
> 
>> On Jul 10, 2019, at 10:40 PM, Boyang Chen  wrote:
>> 
>> Hey Aravind,
>> 
>> If your client/broker just upgraded to 2.3, Jason has filed a blocker for
>> 2.3: https://issues.apache.org/jira/browse/KAFKA-8653
>> 
>> and a fix is on its way: https://github.com/apache/kafka/pull/7072/files
>> 
>> Let me know if you are actually on a different version.
>> 
>> Boyang
>> 
>> On Wed, Jul 10, 2019 at 7:43 PM Aravind Dongara 
>> wrote:
>> 
>>> Our kafka streams application is stuck and continuously emits
>>> "(Re-)joining group” log message every 5 minutes without making any
>>> progress.
>>> 
>>> Kafka-consumer-groups cmd line tool with “—members” option shows lots of
>>> stale members, in addition to expected member-ids shown on log msgs on
>>> kafka-streams app and broker logs that were failing to join).
>>> For some reason these old members didn’t get evicted from members list.
>>> Looks like this is preventing the GroupCordinator from reaching
>>> “CompletingRebalance” state.
>>> 
>>> Restart of Kafka streams app didn’t help either, it just replaced the
>>> newer member-ids; but the old stale member-ids are still present in the
>>> members-list.
>>> 
>>> Is there any way to resolve this without restarting the broker hosting the
>>> GroupCoordinator for this group.
>>> 
>>> Thanks
>>> Aravind
> 



Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-04 Thread Thameem Ansari
i have few more details to share from today’s testing. 

Attached strace to the process and noticed that there are thousands of short 
lived child processes have been created by the stream application. Not sure 
whether rocksdb is playing any role here. Noticed more than 73000 child 
processes were created and this is after i increased the default linux system 
limit from 32767 to 4million per PID. Appreciate if someone answer the 
following questions. 
1. Why rocksdb creates these many processes? Each process is trying to read the 
contents of the file and getting EAGAIN (Resource temporarily unavailable) 
2. Noticed very high number of .checkpoint files missing and hence “No such 
file or directory” message 
3. The child processes were created using “clone” system call. Not sure whether 
there is any significance of this but wanna mention. Moreover these child 
processes were keep on increasing when the load is getting applied 
continuously. Not sure under what condition it will hit the ceiling.
4. Noticed very high VIRT memory usage (around 31G) but RES memory was only 
1.9G. Isn’t every file open/close consume memory? But it is not going up even 
though the number of processes is increased. 

Thanks
Thameem 

> On Jul 4, 2019, at 9:28 AM, Thameem Ansari  wrote:
> 
> Sorry for reposting the previous message as the images didn’t come thru.. 
> pasting as text. 
> 
> I have changed both system and user limits. 
> 
> To completely isolate the problem, I have tried the application in Centos 7 
> environment. Set the ulimit to 1million and system limits to 10million open 
> files. Now 3 kafka nodes are running in separate servers each and streaming 
> application is running in a dedicated VM. Now the application is not 
> explicitly throwing “too many open files” error but automatically aborted 
> with the message 
> 
> terminate called after throwing an instance of 'std::system_error'
>   what():  Resource temporarily unavailable
> ./bin/start.sh: line 42:  1100 Aborted 
> 
> Here is the last few lines from strace output which shows the aborted 
> message. 
> 
> 25721 14:58:35 
> open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/MANIFEST-07",
>  O_RDONLY|O_CLOEXEC) = 12505
> 25721 14:58:35 
> open("/sys/devices/virtual/block/dm-2/queue/logical_block_size", O_RDONLY) = 
> 12506
> 25721 14:58:35 read(12506, "512\n", 4096) = 4
> 25721 14:58:35 close(12506) = 0
> 25721 14:58:35 write(12502, "s.advise_random_on_open: 0\n2019/"..., 4096) = 
> 4096
> 25721 14:58:35 write(12502, "ions.comparator: leveldb.Bytewis"..., 4096) = 
> 4096
> 25721 14:58:35 read(12505, 
> "V\371\270\370\34\0\1\1\32leveldb.BytewiseCompara"..., 32768) = 192
> 25721 14:58:35 read(12505, "", 28672)   = 0
> 25721 14:58:35 close(12505) = 0
> 17701 14:58:35 
> open("/home/devops/data/kafka-streams/RawLog_Processor/393_7/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN/MAIL.INCOMING_FILTER_BY_USER_MAIL_INCOMING_DOMAIN_TRAFFIC_DETAIL.DETECTED_CONTENT_FILTER_RECIPIENT_DOMAIN.155520/06.sst",
>  O_RDONLY|O_CLOEXEC) = 12505
> 17702 14:58:35 +++ exited with 0 +++
> 25721 14:58:35 write(2, "terminate called after throwing "..., 48) = 48
> 25721 14:58:35 write(2, "std::system_error", 17) = 17
> 25721 14:58:35 write(2, "'\n", 2)   = 2
> 25721 14:58:35 write(2, "  what():  ", 11) = 11
> 25721 14:58:35 write(2, "Resource temporarily unavailable", 32) = 32
> 25721 14:58:35 write(2, "\n", 1)= 1
> 17701 14:58:35 
> open("/sys/devices/virtual/block/dm-2/queue/logical_block_size", O_RDONLY) = 
> 12506
> 25721 14:58:35 write(12502, "ction_dynamic_level_bytes: 0\n201"..., 3672) = 
> 3672
> 25721 14:58:35 --- SIGABRT {si_signo=SIGABRT, si_code=SI_TKILL, si_pid=25697, 
> si_uid=1000} ---
> 17701 14:58:35 read(12506,  
> 17701 14:58:36 +++ killed by SIGABRT +++
> 17700 14:58:36 +++ killed by SIGABRT +++
> 17699 14:58:36 +++ killed by SIGABRT +++
> 
> 
> As I can see from the open files they are way lower (45201) than the max 
> limit and hence can we rule out the “open files” is the root cause? 
> 
> I have also noticed there is lot of EAGAIN (count 3334) and associated 
> “Resource temporarily unavailable” messages as well
> 
> 
> 
> 25732 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) 
> = 59
> 25732 14:49:23 

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-03 Thread Thameem Ansari
25732 14:49:23 close(59)= 0
25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25709 14:49:23 close(59)= 0
25721 14:49:23 read(54, "\0\1dx", 4)= 4
25721 14:49:23 read(54, 
"\0\0\0\v\0\0\0\0\0\0\0\1dj\0\0\0\0\1\212\0\201RawLog_Pro"..., 91256) = 91256
25721 14:49:23 read(54, 0x7f7044003b80, 4) = -1 EAGAIN (Resource temporarily 
unavailable)
25732 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25732 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25721 14:49:23 write(35, "2019-07-03 14:49:23.258 [rawLP1-"..., 219 
25732 14:49:23 close(59)= 0
25721 14:49:23 <... write resumed> )= 219
25721 14:49:23 write(1, "2019-07-03 14:49:23.258 [rawLP1-"..., 219) = 219
25721 14:49:23 write(50, "\0", 1)   = 1
25721 14:49:23 read(49, "\0", 16)   = 1
25708 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
60
25708 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25708 14:49:23 close(59)= 0
25709 14:49:23 read(60, "9223372036854775807\n", 4096) = 20
25709 14:49:23 close(60)= 0
25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25709 14:49:23 close(59)= 0
25708 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25708 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25708 14:49:23 close(59)= 0
25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25709 14:49:23 close(59)= 0
25732 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25732 14:49:23 read(59, "9223372036854775807\n", 4096) = 20
25732 14:49:23 close(59)= 0
25709 14:49:23 open("/sys/fs/cgroup/memory/memory.limit_in_bytes", O_RDONLY) = 
59
25709 14:49:23 read(59, "9223372036854775807\n", 4096) = 20

But unable to come to any conclusion from these findings. Any insight is 
appreciated. 

Thanks
Thameem

> On Jul 4, 2019, at 9:18 AM, Thameem Ansari  wrote:
> 
> 
> 
> 
> But unable to come to any conclusion from these findings. Any insight is 
> appreciated. 
> 
> Thanks
> Thameem



Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-03 Thread Thameem Ansari
Tried setting the open files to 100 and 50 but the results are same. I checked 
the total open files while the streaming application was busy running just 
before getting the “too many open files” message it was around 41756 which is 
same as what we have got when we set to -1. 

VisualVM shows that there is no abnormality with the threads / memory or heap. 

Thanks
Thameem

> On Jul 3, 2019, at 11:50 AM, Sophie Blee-Goldman  wrote:
> 
> How sure are you that the open file count never goes beyond 50K? Are those
> numbers just from a snapshot after it crashed? It's possible rocks is
> creating a large number of files just for a short period of time (maybe
> while compacting) that causes the open file count to spike and go back down.
> 
> For things to try, you should set the rocks config max.open.files to
> something less than infinity...if you're OS limit is 1 million and you have
> (rounding up) ~5k rocks instances, set this to 1 million / 5k = 200. If you
> set a lower limit and still hit this error, we can go from there
> 
> On Tue, Jul 2, 2019 at 11:10 PM emailtokir...@gmail.com <
> emailtokir...@gmail.com> wrote:
> 
>> 
>> 
>> On 2019/07/03 05:46:45, Sophie Blee-Goldman  wrote:
>>> It sounds like rocksdb *is* honoring your configs -- the max.open.files
>>> config is an internal restriction that tells rocksdb how many open files
>> it
>>> is allowed to have, so if that's set to -1 (infinite) it won't ever try
>> to
>>> limit its open files and you may hit the OS limit.
>>> 
>>> Think of it this way: if you have 100 rocksdb instances and a OS limit of
>>> 500, you should set max.open.files to 5  to avoid hitting this limit
>>> (assuming there are no other open files on the system, in reality you'd
>>> want some extra room there)
>>> 
>>> On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com <
>>> emailtokir...@gmail.com> wrote:
>>> 
 
 
 On 2019/06/28 23:29:16, John Roesler  wrote:
> Hey all,
> 
> If you want to figure it out theoretically, if you print out the
> topology description, you'll have some number of state stores listed
> in there. The number of Rocks instances should just be
> (#global_state_stores +
> sum(#partitions_of_topic_per_local_state_store)) . The number of
> stream threads isn't relevant here.
> 
> You can also figure it out empirically: the first level of
> subdirectories in the state dir are Tasks, and then within that, the
> next level is Stores. You should see the store directory names match
> up with the stores listed in the topology description. The number of
> Store directories is exactly the number of RocksDB instances you
>> have.
> 
> There are also metrics corresponding to each of the state stores, so
> you can compute it from what you find in the metrics.
> 
> Hope that helps,
> -john
> 
> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl 
 wrote:
>> 
>> Hi Kiran
>> Without much research my guess would be "num_stream_threads *
>> (#global_state_stores +
 sum(#partitions_of_topic_per_local_state_store))"
>> So 10 stores (regardless if explicitly defined or implicitely
>> because
 of
>> some stateful operation) with 10 partitions each should result in
>> 100
>> Rocksdb instances if you are running at the default of
 num_stream_threads=1.
>> 
>> As I wrote before, start with 100.
>> If the error persists, half the number, if not, double it ;-)
>> Repeat as
>> needed.
>> 
>> If you reach the single-digit-range and the error still shows up,
>> start
>> searching for any iterators over a store you might not have closed.
>> 
>> br, Patrik
>> 
>> On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <
>> emailtokir...@gmail.com> wrote:
>> 
>>> 
>>> 
>>> On 2019/06/27 09:02:39, Patrik Kleindl 
>> wrote:
 Hello Kiran
 
 First, the value for maxOpenFiles is per RocksDB instance, and
>> the
 number
 of those can get high if you have a lot of topic partitions
>> etc.
 Check the directory (state dir) to see how many there are.
 Start with a low value (100) and see if that has some effect.
 
 Second, because I just found out, you should use
 BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
 options.tableFormatConfig();
tableConfig.setBlockCacheSize(100*1024*1024L);
tableConfig.setBlockSize(8*1024L);
 instead of creating a new object to prevent accidently messing
>> up
 references.
 
 Hope that helps
 best regards
 Patrik
 
 On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <
 emailtokir...@gmail.com> wrote:
 
> 
> 
> On 2019/06/26 21:58:02, Patrik Kleindl 
 wrote:
>> Hi Kiran
>> You can use the RocksDBConfigSetter and pass
>

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Thameem Ansari
As I mentioned, tried setting the OS limit to 600K & 1Million on the shell and 
tried to start the application on the same shell but still the problem exists. 
Tried rebooting the laptop and the results are same. So, need a way to find out 
what exactly is causing this issue when we hit close to 42K system limit. 

Thanks
Thameem


> On Jul 3, 2019, at 11:16 AM, Sophie Blee-Goldman  > wrote:
> 
> It sounds like rocksdb *is* honoring your configs -- the max.open.files
> config is an internal restriction that tells rocksdb how many open files it
> is allowed to have, so if that's set to -1 (infinite) it won't ever try to
> limit its open files and you may hit the OS limit.
> 
> Think of it this way: if you have 100 rocksdb instances and a OS limit of
> 500, you should set max.open.files to 5  to avoid hitting this limit
> (assuming there are no other open files on the system, in reality you'd
> want some extra room there)
> 
> On Tue, Jul 2, 2019 at 7:53 PM emailtokir...@gmail.com 
>  <
> emailtokir...@gmail.com > wrote:
> 
>> 
>> 
>> On 2019/06/28 23:29:16, John Roesler > > wrote:
>>> Hey all,
>>> 
>>> If you want to figure it out theoretically, if you print out the
>>> topology description, you'll have some number of state stores listed
>>> in there. The number of Rocks instances should just be
>>> (#global_state_stores +
>>> sum(#partitions_of_topic_per_local_state_store)) . The number of
>>> stream threads isn't relevant here.
>>> 
>>> You can also figure it out empirically: the first level of
>>> subdirectories in the state dir are Tasks, and then within that, the
>>> next level is Stores. You should see the store directory names match
>>> up with the stores listed in the topology description. The number of
>>> Store directories is exactly the number of RocksDB instances you have.
>>> 
>>> There are also metrics corresponding to each of the state stores, so
>>> you can compute it from what you find in the metrics.
>>> 
>>> Hope that helps,
>>> -john
>>> 
>>> On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl >> >
>> wrote:
 
 Hi Kiran
 Without much research my guess would be "num_stream_threads *
 (#global_state_stores +
>> sum(#partitions_of_topic_per_local_state_store))"
 So 10 stores (regardless if explicitly defined or implicitely because
>> of
 some stateful operation) with 10 partitions each should result in 100
 Rocksdb instances if you are running at the default of
>> num_stream_threads=1.
 
 As I wrote before, start with 100.
 If the error persists, half the number, if not, double it ;-) Repeat as
 needed.
 
 If you reach the single-digit-range and the error still shows up, start
 searching for any iterators over a store you might not have closed.
 
 br, Patrik
 
 On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com 
  <
 emailtokir...@gmail.com > wrote:
 
> 
> 
> On 2019/06/27 09:02:39, Patrik Kleindl  > wrote:
>> Hello Kiran
>> 
>> First, the value for maxOpenFiles is per RocksDB instance, and the
>> number
>> of those can get high if you have a lot of topic partitions etc.
>> Check the directory (state dir) to see how many there are.
>> Start with a low value (100) and see if that has some effect.
>> 
>> Second, because I just found out, you should use
>> BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)
>> options.tableFormatConfig();
>>tableConfig.setBlockCacheSize(100*1024*1024L);
>>tableConfig.setBlockSize(8*1024L);
>> instead of creating a new object to prevent accidently messing up
>> references.
>> 
>> Hope that helps
>> best regards
>> Patrik
>> 
>> On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com 
>>  <
>> emailtokir...@gmail.com > wrote:
>> 
>>> 
>>> 
>>> On 2019/06/26 21:58:02, Patrik Kleindl >> >
>> wrote:
 Hi Kiran
 You can use the RocksDBConfigSetter and pass
 
 options.setMaxOpenFiles(100);
 
 to all RocksDBs for the Streams application which limits how
>> many are
 kept open at the same time.
 
 best regards
 
 Patrik
 
 
 On Wed, 26 Jun 2019 at 16:14, emailtokir...@gmail.com 
  <
 emailtokir...@gmail.com > wrote:
 
> Hi,
> 
> We are using Kafka streams DSL APIs for doing some counter
> aggregations
> (running on OpenJDK 11.0.2). Our topology has some 400 sub
> topologies
>>> & we
>>>

Re: Kafka streams (2.1.1) - org.rocksdb.RocksDBException:Too many open files

2019-07-02 Thread Thameem Ansari
Many places it is mentioned that closing the iterator is fixing the issue but 
this is true only if we use Processor APIs. But in DSL there is no iterator 
explicitly available and we are using wrapper methods like aggregate, map, 
groupBy, etc. 

Here is the snapshot of the issue with exact statistics observed from the 
recent run in Mac with Mojave. 
 
- Number of state directories are 3154 and hence there will be 3154 rocksdb 
instances 
 - OS openfiles limit was set to 1Million, here is the break out on number of 
open files: 
Total open files from the system: 41645
Open files from stream application: 16545
Open files related to state directories: 16025
So, if we do the math 16025/3154 ~ 5 files per instance 

 - Following parameters were used but still problem exists 
cache.index.and.filter.blocks=false 
block.cache.size=100MB
code block.size=8MB
max.write.buffer.number=2 
table.cache.numshardbits=8 
max.open.files=-1 
compaction.readahead.size=256MB 
skip.stats.update_on_db_open=true 
write.buffer.size=32MB
 - Topic has 8 partitions and the streaming application is running as SINGLE 
instance with SINGLE thread 
 - Noticed these rocksdb properties have been consumed by the app but not 
working as expected (or defined in the documentation)
 - Observed no issues reported related to memory  

Thanks
Thameem


On 2019/07/03 02:53:20, "e...@gmail.com> wrote: 
> 
> 
> On 2019/06/28 23:29:16, John Roesler  wrote: > 
> > Hey all,> 
> > > 
> > If you want to figure it out theoretically, if you print out the> 
> > topology description, you'll have some number of state stores listed> 
> > in there. The number of Rocks instances should just be> 
> > (#global_state_stores +> 
> > sum(#partitions_of_topic_per_local_state_store)) . The number of> 
> > stream threads isn't relevant here.> 
> > > 
> > You can also figure it out empirically: the first level of> 
> > subdirectories in the state dir are Tasks, and then within that, the> 
> > next level is Stores. You should see the store directory names match> 
> > up with the stores listed in the topology description. The number of> 
> > Store directories is exactly the number of RocksDB instances you have.> 
> > > 
> > There are also metrics corresponding to each of the state stores, so> 
> > you can compute it from what you find in the metrics.> 
> > > 
> > Hope that helps,> 
> > -john> 
> > > 
> > On Thu, Jun 27, 2019 at 6:46 AM Patrik Kleindl  wrote:> 
> > >> 
> > > Hi Kiran> 
> > > Without much research my guess would be "num_stream_threads *> 
> > > (#global_state_stores + 
> > > sum(#partitions_of_topic_per_local_state_store))"> 
> > > So 10 stores (regardless if explicitly defined or implicitely because of> 
> > > some stateful operation) with 10 partitions each should result in 100> 
> > > Rocksdb instances if you are running at the default of 
> > > num_stream_threads=1.> 
> > >> 
> > > As I wrote before, start with 100.> 
> > > If the error persists, half the number, if not, double it ;-) Repeat as> 
> > > needed.> 
> > >> 
> > > If you reach the single-digit-range and the error still shows up, start> 
> > > searching for any iterators over a store you might not have closed.> 
> > >> 
> > > br, Patrik> 
> > >> 
> > > On Thu, 27 Jun 2019 at 13:11, emailtokir...@gmail.com <> 
> > > emailtokir...@gmail.com> wrote:> 
> > >> 
> > > >> 
> > > >> 
> > > > On 2019/06/27 09:02:39, Patrik Kleindl  wrote:> 
> > > > > Hello Kiran> 
> > > > >> 
> > > > > First, the value for maxOpenFiles is per RocksDB instance, and the 
> > > > > number> 
> > > > > of those can get high if you have a lot of topic partitions etc.> 
> > > > > Check the directory (state dir) to see how many there are.> 
> > > > > Start with a low value (100) and see if that has some effect.> 
> > > > >> 
> > > > > Second, because I just found out, you should use> 
> > > > > BlockBasedTableConfig tableConfig = (BlockBasedTableConfig)> 
> > > > > options.tableFormatConfig();> 
> > > > > tableConfig.setBlockCacheSize(100*1024*1024L);> 
> > > > > tableConfig.setBlockSize(8*1024L);> 
> > > > > instead of creating a new object to prevent accidently messing up> 
> > > > > references.> 
> > > > >> 
> > > > > Hope that helps> 
> > > > > best regards> 
> > > > > Patrik> 
> > > > >> 
> > > > > On Thu, 27 Jun 2019 at 10:46, emailtokir...@gmail.com <> 
> > > > > emailtokir...@gmail.com> wrote:> 
> > > > >> 
> > > > > >> 
> > > > > >> 
> > > > > > On 2019/06/26 21:58:02, Patrik Kleindl  wrote:> 
> > > > > > > Hi Kiran> 
> > > > > > > You can use the RocksDBConfigSetter and pass> 
> > > > > > >> 
> > > > > > > options.setMaxOpenFiles(100);> 
> > > > > > >> 
> > > > > > > to all RocksDBs for the Streams application which limits how many 
> > > > > > > are> 
> > > > > > > kept open at the same time.> 
> > > > > > >> 
> > > > > > > best regards> 
> > > > > > >> 
> > > > > > > Patrik> 
> > > > > > >