High system.io.await on Kafka brokers?

2019-01-22 Thread wenxing zheng
Dear all,

We got a kafka cluster with 5 nodes, and from the metrics of datadog, we
found that regularly the elapse for sending to kafka was more than 200ms,
and there was a peek on the system.io.await.

Please help to advice what would be the problem and any hints?
[image: image.png]

Kind regards, Wenxing


Re: High system.io.await on Kafka brokers?

2019-01-22 Thread Sam Pegler
Sounds like you're reaching the limits of what your disks will do either on
reads or writes.  Debug it as you would any other disk based app,
https://haydenjames.io/linux-server-performance-disk-io-slowing-application/
might help.



On Tue, 22 Jan 2019 at 09:19, wenxing zheng  wrote:

> Dear all,
>
> We got a kafka cluster with 5 nodes, and from the metrics of datadog, we
> found that regularly the elapse for sending to kafka was more than 200ms,
> and there was a peek on the system.io.await.
>
> Please help to advice what would be the problem and any hints?
> [image: image.png]
>
> Kind regards, Wenxing
>


Open files clogging and KafkaStreams

2019-01-22 Thread Niklas Lönn
Hi Kafka Devs & Users,

We recently had an issue where we processed a lot of old data and we
crashed our brokers due to too many memory mapped files.
It seems to me that the nature of Kafka / Kafka Streams is a bit suboptimal
in terms of resource management. (Keeping all files open all the time,
maybe there should be something managing this more on-demand?)

In the issue I described, the repartition topic was produced very fast, but
not consumed, causing a lot of segments and files to be open at the same
time.

I have worked around the issue by making sure I have more threads than
partitions to force tasks to subscribe to internal topics only, but seems a
bit hacky and maybe there should be some guidance in documentation if
considered part of design..

After quite some testing and code reversing it seems that the nature of
this imbalance lies within how the broker multiplexes the consumed
topic-partitions.

I have attached a slide that I will present to my team to explain the issue
in a bit more detail, it might be good to check it out to understand the
context.

Any thoughts about my findings and concerns?

Kind regards
Niklas


Re: NullPointerException in KafkaStreams during startup

2019-01-22 Thread Guozhang Wang
Hi Johan,

Your observation is correct, the root cause is that your two instances is
being upgraded in sequential order: say your old topology is tp1, and your
new topology with the new stream / topic is tp2, when you are upgrading say
instance1, instance1 knows already about tp2 while the other instance2
still thinks the topology is tp2. If instance1 contains the leader of the
consumer group than it will do the task assignment based on tp1 and send it
via join-group responses, but instance2 upon receiving the tasks based on
tp2 would not be able to "interpret" it since it only knows tp1.

For such cases if your change is only for adding a new topic / stream you
probably do not need to reset your application, but you still need to bring
down you app (all instances), and swap in the new code on all your
instances and then re-start them.


Guozhang


On Mon, Jan 21, 2019 at 4:22 PM Matthias J. Sax 
wrote:

> That is expected... It's not possible to change the subscription during
> a rolling restart. You need to stop all instances and afterwards start
> new instances with the new subscription.
>
> I did not look into the details of your change, but you might also need
> to reset your application before starting new instances, because
> changing the subscription might be a "breaking" change:
>
> https://docs.confluent.io/current/streams/developer-guide/app-reset-tool.html
>
>
> -Matthias
>
>
> On 1/21/19 2:49 PM, Johan Horvius wrote:
> > Hello!
> >
> > I'm having trouble when deploying a new version of a service during the
> > re-balancing step where the topology doesn't match what KafkaStreams
> > library assumes and there's a NPE while creating tasks.
> >
> > Background info:
> > I'm running a Spring Boot service which utilizes KafkaStreams, currently
> > subscribed to two topics that has 10 partitions each. The service is
> > running in 2 instances for increased reliability and load balancing.
> > In the next version of the service I've added another stream listening
> > to a different topic. The service is deployed with a rolling strategy
> > where first 2 instances of the new version is added and then the old
> > versions 2 instances are shut down.
> >
> > When trying to deploy my new version the partitions are withdrawn and
> > re-assigned and during the task creation the NPE happens and
> > KafkaStreams goes into a failed state.
> >
> > Kafka is backed by 3 brokers in a cluster.
> >
> > I've tried to re-create the scenario in a simpler setting but been
> > unable to do so. The re-balancing works fine when I try to run it
> > locally with dummy test topics.
> >
> > I'm attaching the log from the service.
> >
> > While trying to figure out what was wrong the only conclusion I could
> > come up with was that KafkaStreams got confused due to building an
> > original topology and then during re-balance got tasks in another order
> > and then it did not re-build the internal topology before trying to
> > create tasks, thus a mismatch between KafkaStreams node groups
> > associated with a task key such as 3_3 would not match up with the
> > expected consumer/producer-combo.
> >
> > Hopefully you can shed some lights on what could be wrong.
> >
> > Regards
> > Johan Horvius
> >
> >
>
>

-- 
-- Guozhang


Re: Open files clogging and KafkaStreams

2019-01-22 Thread Guozhang Wang
Hello Niklas,

If you can monitor your repartition topic's consumer lag, and it was
increasing consistently, it means your downstream processor cannot simply
keep up with the throughput of the upstream processor. Usually it means
your downstream operators is heavier (e.g. aggregations, joins that are all
stateful) than your upstreams (e.g. simply for shuffling the data to
repartition topics), and since tasks assignment only consider a task as the
smallest unit of work and did not differentiate "heavy" and "light" tasks,
such imbalance of task assignment may happen. At the moment, to resolve
this you should add more resources to make sure the heavy tasks get enough
computational resource assigned (more threads, e.g.).

If your observed consumer lag stays plateau after increasing to some point,
it means your consumer can actually keep up with some constant lag; if you
hit your open file limits before seeing this, it means you either need to
increase your open file limits, OR you can simply increase the segment size
to reduce num. files via "StreamsConfig.TOPIC_PREFIX"to set the value of
TopicConfig.SEGMENT_BYTES_CONFIG.


Guozhang


On Tue, Jan 22, 2019 at 4:38 AM Niklas Lönn  wrote:

> Hi Kafka Devs & Users,
>
> We recently had an issue where we processed a lot of old data and we
> crashed our brokers due to too many memory mapped files.
> It seems to me that the nature of Kafka / Kafka Streams is a bit
> suboptimal in terms of resource management. (Keeping all files open all the
> time, maybe there should be something managing this more on-demand?)
>
> In the issue I described, the repartition topic was produced very fast,
> but not consumed, causing a lot of segments and files to be open at the
> same time.
>
> I have worked around the issue by making sure I have more threads than
> partitions to force tasks to subscribe to internal topics only, but seems a
> bit hacky and maybe there should be some guidance in documentation if
> considered part of design..
>
> After quite some testing and code reversing it seems that the nature of
> this imbalance lies within how the broker multiplexes the consumed
> topic-partitions.
>
> I have attached a slide that I will present to my team to explain the
> issue in a bit more detail, it might be good to check it out to understand
> the context.
>
> Any thoughts about my findings and concerns?
>
> Kind regards
> Niklas
>


-- 
-- Guozhang


Re: KTable.suppress(Suppressed.untilWindowCloses) does not suppress some non-final results when the kafka streams process is restarted

2019-01-22 Thread John Roesler
Hi Peter,

Just to follow up on the actual bug, can you confirm whether:
* when you say "restart", do you mean orderly shutdown and restart, or
crash and restart?
* have you tried this with EOS enabled? I can imagine some ways that there
could be duplicates, but they should be impossible with EOS enabled.

Thanks for your help,
-John

On Mon, Jan 14, 2019 at 1:20 PM John Roesler  wrote:

> Hi Peter,
>
> I see your train of thought, but the actual implementation of the
> window store is structured differently from your mental model.
> Unlike Key/Value stores, we know that the records in a window
> store will "expire" on a regular schedule, and also that every single
> record will eventually expire. With this in mind, we have implemented
> an optimization to avoid a lot of compaction overhead in RocksDB, as
> well as saving on range scans.
>
> Instead of storing everything in one database, we open several
> databases and bucket windows into them. Then, when windows
> expire, we just ignore the records (i.e., the API makes them unreachable,
> but we don't actually delete them). Once all the windows in a database
> are expired, we just close and delete the whole database. Then, we open
> a new one for new windows. If you look in the code, these databases are
> called "segments".
>
> Thus, I don't think that you should attempt to use the built-in window
> stores
> as you described. Instead, it should be straightforward to implement your
> own StateStore with a layout that's more favorable to your desired
> behavior.
>
> You should also be able to set up the change log the way you need as well.
> Explicitly removed entities also would get removed from the log as well, if
> it's a compacted log.
>
> Actually, what you're describing is *very* similar to the implementation
> for suppress. I might actually suggest that you just copy the suppression
> implementation and adapt it to your needs, or at the very least, study
> how it works. In doing so, you might actually discover the cause of the
> bug yourself!
>
> I hope this helps, and thanks for your help,
> -John
>
>
> On Sat, Jan 12, 2019 at 5:45 AM Peter Levart 
> wrote:
>
>> Hi Jonh,
>>
>> Thank you very much for explaining how WindowStore works. I have some
>> more questions...
>>
>> On 1/10/19 5:33 PM, John Roesler wrote:
>> > Hi Peter,
>> >
>> > Regarding retention, I was not referring to log retention, but to the
>> > window store retention.
>> > Since a new window is created every second (for example), there are in
>> > principle an unbounded
>> > number of windows (the longer the application runs, the more windows
>> there
>> > are, with no end).
>> > However, we obviously can't store an infinite amount of data, so the
>> window
>> > definition includes
>> > a retention period. By default, this is 24 hours. After the retention
>> > period elapses, all of the data
>> > for the window is purged to make room for new windows.
>>
>> Right. Would the following work for example:
>>
>> - configure retention of WindowStore to be "infinite"
>> - explicitly remove records from the store when windows are flushed out
>> - configure WindowStore log topic for compacting
>>
>> Something like the following:
>>
>>  Stores
>>  .windowStoreBuilder(
>>  Stores.persistentWindowStore(
>>  storeName,
>>  Duration.of(1000L, ChronoUnit.YEARS), //
>> retentionPeriod
>>  Duration.ofSeconds(10), // windowSize
>>  false
>>  ),
>>  keySerde, valSerde
>>  )
>>  .withCachingEnabled()
>>  .withLoggingEnabled(
>>  Map.of(
>>  TopicConfig.CLEANUP_POLICY_CONFIG,
>> TopicConfig.CLEANUP_POLICY_COMPACT
>>  )
>>  );
>>
>> Would in above scenario:
>>
>> - the on-disk WindowStore be kept bounded (there could be some very old
>> entries in it but majority will be new - depending on the activity of
>> particular input keys)
>> - the log topic be kept bounded (explicitly removed entries would be
>> removed from compacted log too)
>>
>> I'm moving away from DSL partly because I have some problems with
>> suppression (which I hope we'll be able to fix) and partly because the
>> DSL can't give me the complicated semantics that I need for the
>> application at hand. I tried to capture what I need in a custom
>> Transformer here:
>>
>> https://gist.github.com/plevart/d3f70bee7346f72161ef633aa60dc94f
>>
>> Your knowledge of how WindowStore works would greatly help me decide if
>> this is a workable idea.
>>
>> >
>> > So what I meant was that if you buffer some key "A" in window (Monday
>> > 09:00:00) and then get
>> > no further activity for A for over 24 hours, then when you do get that
>> next
>> > event for A, say at
>> > (Tuesday 11:00:00), you'd do the scan but find nothing, since your
>> buffered
>> > state would already
>> > have been purged from the store.

Kafka Consumer Not Assigned Partitions

2019-01-22 Thread chinchu chinchu
Hello,
I  have subscribed to a kafka topic  as below . I need to run some logic
only after the consumer has been assigned a partition .How ever
consumer.assignment() comes back as an empty set no matter  how long I wait
. If I do not have the while loop and then do a  consumer.poll() I do get
the records from the topic.Can any one tell me why this is  happening ?

consumer.subscribe(topics);
  consumer.

  Set assigned=Collections.emptySet();
  while(isAssigned)
{
  assigned = consumer.assignment();
 if(!assigned.isEmpty()) {
  isAssigned= false;
   }
   }

Thanks,
Chinchu


Broker continuously expand and shrinks to itself

2019-01-22 Thread Ashish Karalkar
 Hi All,

We just upgraded from 0.10.x to 1.1 and enabled rack awareness on an existing 
clusters which has about 20 nodes in 4 rack . After this we see that few 
brokers goes on continuous expand and shrink ISR to itself  cycle , it is also 
causing high time for serving meta data requests.
What is the impact of enabling rack awareness on existing cluster assuming 
replication factor is 3 and all existing replica may or may not be in different 
rack when rack awareness was enabled after which a rolling bounce was done. 
Symptoms we are having are replica lag and slow metadata requests. Also in 
brokers log we continuously see disconnection from the broker where it is 
trying to expand. 
Thanks for helping...
--A
  

Broker continuously expand and shrinks to itself

2019-01-22 Thread Ashish Karalkar
Hi All,
We just upgraded from 0.10.x to 1.1 and enabled rack awareness on an existing 
clusters which has about 20 nodes in 4 rack . After this we see that few 
brokers goes on continuous expand and shrink ISR to itself  cycle , it is also 
causing high time for serving meta data requests.
What is the impact of enabling rack awareness on existing cluster assuming 
replication factor is 3 and all existing replica may or may not be in different 
rack when rack awareness was enabled after which a rolling bounce was done. 
Symptoms we are having are replica lag and slow metadata requests. Also in 
brokers log we continuously see disconnection from the broker where it is 
trying to expand. 
Thanks for helping
--A


Max Limit of partitions in topic

2019-01-22 Thread marimuthu eee
Hi,

I have one dought, What is the maximum limit of partitions in one topic of
kafka cluster.Please help me.


Re: Max Limit of partitions in topic

2019-01-22 Thread Rahul Singh
There is no limit for partitioning in Kafka. It would be good the number of
partitions is equal to number of consumers. The consumer fetches a batch of
messages per partition. The more partitions that a consumer consumes, the
more memory it needs.

On Wed, Jan 23, 2019 at 12:25 PM marimuthu eee 
wrote:

> Hi,
>
> I have one dought, What is the maximum limit of partitions in one topic of
> kafka cluster.Please help me.
>