kafka-streams: interaction between max.poll.records and window expiration ?

2020-12-20 Thread Mathieu D
Hello there,

One of our input topics does not have so much traffic.
Divided by the number of partitions, and given the default 'max.poll.records'
setting (being 1000 if I understand the doc correctly), it could happen
that fetching 1000 records at once, the event timestamps between the first
and last record in the "batch" could be larger than some windows in my
topology.

Could this have any impact on window expiration ?

Thanks
Mathieu


Re: Punctuate NPE

2020-12-20 Thread Navneeth Krishnan
Thanks John & Blake. Will try to recreate the issue and see what's going on.

Regards,
Navneeth

On Thu, Dec 17, 2020 at 6:27 PM Blake Miller  wrote:

> Navneeth,
>
> You may need this JVM option:
>
>  -XX:-OmitStackTraceInFastThrow
>
> as some stack frame information can be optimized away by default by some
> JVMs leaving us blind to the real issue.
>
> HTH
>
> On Wed, Dec 16, 2020 at 4:15 AM John Roesler  wrote:
>
> > Hi Navneeth,
> >
> > I'm sorry for the trouble.
> >
> > Which version of Streams are you using? Also, this doesn't
> > look like the full stacktrace, since we can't see the NPE
> > itself. Can you share the whole thing?
> >
> > Thanks,
> > -John
> >
> >
> > On Tue, 2020-12-15 at 00:30 -0800, Navneeth Krishnan wrote:
> > > Hi All,
> > >
> > > I have a scheduled function that runs every 10 seconds and in some
> cases
> > I
> > > see this NPE. Not sure how to debug this issue. Any pointers would
> really
> > > help. Thanks
> > >
> > > context.schedule(this.scheduleInterval, PunctuationType.STREAM_TIME,
> > > this::flush);
> > >
> > >
> > > 2020-12-15 07:40:14.214
> > > [userapp-c2db617d-ed40-4c9a-a3b3-e9942c19d28a-StreamThread-4] ERROR
> > > Pipeline -
> > > org.apache.kafka.streams.errors.StreamsException: stream-thread
> > > [userapp-c2db617d-ed40-4c9a-a3b3-e9942c19d28a-StreamThread-4] task
> > [12_16]
> > > Exception caught while punctuating processor 'Window_processor'
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:760)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:54)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuateStreamTime(StreamTask.java:941)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.punctuate(TaskManager.java:1066)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:707)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > > at
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
> > > ~[kafka-streams-2.6.0.jar!/:?]
> > >
> > > Thanks,
> > > Navneeth
> >
> >
> >
>


In Memory State Store

2020-12-20 Thread Navneeth Krishnan
Hi All,

I have a question about the inMemoryKeyValue store. I was under the
assumption that in-memory stores would not serialize the objects but when I
looked into the implementation I see InMemoryKeyValueStore uses a
NavigableMap of bytes which indicates the user objects have to be
serialized and stored.

Am I missing something? Wouldn't this cause more serialization overhead for
storing something in memory?

In my case I have a punctuator which reads all the entries in the state
store and forwards the data. When there are around 10k entries it takes
about 400ms to complete. I was trying to optimize this problem. I use kryo
serde and the objects are bigger in size (approx 500 bytes).

Regards,
Navneeth


Re: Kafka Scaling Ideas

2020-12-20 Thread Yana K
Thank you so much Marina and Haruka.

Marina's response:
- When you say " if you are sure there is no room for perf optimization of
the processing itself :" - do you mean code level optimizations? Can you
please explain?
- On the second topic you say " I'd say at least 40" - is this based on 12
million records / hour?
-  "if you can change the incoming topic" - I don't think it is possible :(
-  "you could artificially achieve the same by adding one more step
(service) in your pipeline" - this is the next thing - but I want to be
sure this will help, given we've to maintain one more layer

Haruka's response:
- "One possible solution is creating an intermediate topic" - I already did
it
- I'll look at Decaton - thx

Is there any thoughts on the auto commit vs manual commit - if it can
better the performance while consuming?

Yana



On Sat, Dec 19, 2020 at 7:01 PM Haruki Okada  wrote:

> Hi.
>
> Yeah, Spring-Kafka does processing messages sequentially, so the consumer
> throughput would be capped by database latency per single process.
> One possible solution is creating an intermediate topic (or altering source
> topic) with much more partitions as Marina suggested.
>
> I'd like to suggest another solution, that is multi-threaded processing per
> single partition.
> Decaton (https://github.com/line/decaton) is a library to achieve it.
>
> Also confluent has published a blog post about parallel-consumer (
>
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> )
> for that purpose, but it seems it's still in the BETA stage.
>
> 2020年12月20日(日) 11:41 Marina Popova :
>
> > The way I see it - you can only do a few things - if you are sure there
> is
> > no room for perf optimization of the processing itself :
> > 1. speed up your processing per consumer thread: which you already tried
> > by splitting your logic into a 2-step pipeline instead of 1-step, and
> > delegating the work of writing to a DB to the second step ( make sure
> your
> > second intermediate Kafka topic is created with much more partitions to
> be
> > able to parallelize your work much higher - I'd say at least 40)
> > 2. if you can change the incoming topic - I would create it with many
> more
> > partitions as well - say at least 40 or so - to parallelize your first
> step
> > service processing more
> > 3. and if you can't increase partitions for the original topic ) - you
> > could artificially achieve the same by adding one more step (service) in
> > your pipeline that would just read data from the original 7-partition
> > topic1 and just push it unchanged into a new topic2 with , say 40
> > partitions - and then have your other services pick up from this topic2
> >
> >
> > good luck,
> > Marina
> >
> > Sent with ProtonMail Secure Email.
> >
> > ‐‐‐ Original Message ‐‐‐
> > On Saturday, December 19, 2020 6:46 PM, Yana K 
> > wrote:
> >
> > > Hi
> > >
> > > I am new to the Kafka world and running into this scale problem. I
> > thought
> > > of reaching out to the community if someone can help.
> > > So the problem is I am trying to consume from a Kafka topic that can
> > have a
> > > peak of 12 million messages/hour. That topic is not under my control -
> it
> > > has 7 partitions and sending json payload.
> > > I have written a consumer (I've used Java and Spring-Kafka lib) that
> will
> > > read that data, filter it and then load it into a database. I ran into
> a
> > > huge consumer lag that would take 10-12hours to catch up. I have 7
> > > instances of my application running to match the 7 partitions and I am
> > > using auto commit. Then I thought of splitting the write logic to a
> > > separate layer. So now my architecture has a component that reads and
> > > filters and produces the data to an internal topic (I've done 7
> > partitions
> > > but as you see it's under my control). Then a consumer picks up data
> from
> > > that topic and writes it to the database. It's better but still it
> takes
> > > 3-5hours for the consumer lag to catch up.
> > > Am I missing something fundamentally? Are there any other ideas for
> > > optimization that can help overcome this scale challenge. Any pointer
> and
> > > article will help too.
> > >
> > > Appreciate your help with this.
> > >
> > > Thanks
> > > Yana
> >
> >
> >
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


Re: Kafka Scaling Ideas

2020-12-20 Thread Haruki Okada
It depends on how you manually commit offsets.
Auto-commit does commits offsets in async manner basically, so as long as
you do manual-commit in the same way,  there should be no much difference.

And, generally offset-commit mode doesn't make much difference in
performance regardless manual/auto or async/sync unless offset-commit
latency takes significant amount in processing time (e.g. you commit
offsets synchronously in every poll() loop).

2020年12月21日(月) 11:08 Yana K :

> Thank you so much Marina and Haruka.
>
> Marina's response:
> - When you say " if you are sure there is no room for perf optimization of
> the processing itself :" - do you mean code level optimizations? Can you
> please explain?
> - On the second topic you say " I'd say at least 40" - is this based on 12
> million records / hour?
> -  "if you can change the incoming topic" - I don't think it is possible :(
> -  "you could artificially achieve the same by adding one more step
> (service) in your pipeline" - this is the next thing - but I want to be
> sure this will help, given we've to maintain one more layer
>
> Haruka's response:
> - "One possible solution is creating an intermediate topic" - I already did
> it
> - I'll look at Decaton - thx
>
> Is there any thoughts on the auto commit vs manual commit - if it can
> better the performance while consuming?
>
> Yana
>
>
>
> On Sat, Dec 19, 2020 at 7:01 PM Haruki Okada  wrote:
>
> > Hi.
> >
> > Yeah, Spring-Kafka does processing messages sequentially, so the consumer
> > throughput would be capped by database latency per single process.
> > One possible solution is creating an intermediate topic (or altering
> source
> > topic) with much more partitions as Marina suggested.
> >
> > I'd like to suggest another solution, that is multi-threaded processing
> per
> > single partition.
> > Decaton (https://github.com/line/decaton) is a library to achieve it.
> >
> > Also confluent has published a blog post about parallel-consumer (
> >
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > )
> > for that purpose, but it seems it's still in the BETA stage.
> >
> > 2020年12月20日(日) 11:41 Marina Popova :
> >
> > > The way I see it - you can only do a few things - if you are sure there
> > is
> > > no room for perf optimization of the processing itself :
> > > 1. speed up your processing per consumer thread: which you already
> tried
> > > by splitting your logic into a 2-step pipeline instead of 1-step, and
> > > delegating the work of writing to a DB to the second step ( make sure
> > your
> > > second intermediate Kafka topic is created with much more partitions to
> > be
> > > able to parallelize your work much higher - I'd say at least 40)
> > > 2. if you can change the incoming topic - I would create it with many
> > more
> > > partitions as well - say at least 40 or so - to parallelize your first
> > step
> > > service processing more
> > > 3. and if you can't increase partitions for the original topic ) - you
> > > could artificially achieve the same by adding one more step (service)
> in
> > > your pipeline that would just read data from the original 7-partition
> > > topic1 and just push it unchanged into a new topic2 with , say 40
> > > partitions - and then have your other services pick up from this topic2
> > >
> > >
> > > good luck,
> > > Marina
> > >
> > > Sent with ProtonMail Secure Email.
> > >
> > > ‐‐‐ Original Message ‐‐‐
> > > On Saturday, December 19, 2020 6:46 PM, Yana K 
> > > wrote:
> > >
> > > > Hi
> > > >
> > > > I am new to the Kafka world and running into this scale problem. I
> > > thought
> > > > of reaching out to the community if someone can help.
> > > > So the problem is I am trying to consume from a Kafka topic that can
> > > have a
> > > > peak of 12 million messages/hour. That topic is not under my control
> -
> > it
> > > > has 7 partitions and sending json payload.
> > > > I have written a consumer (I've used Java and Spring-Kafka lib) that
> > will
> > > > read that data, filter it and then load it into a database. I ran
> into
> > a
> > > > huge consumer lag that would take 10-12hours to catch up. I have 7
> > > > instances of my application running to match the 7 partitions and I
> am
> > > > using auto commit. Then I thought of splitting the write logic to a
> > > > separate layer. So now my architecture has a component that reads and
> > > > filters and produces the data to an internal topic (I've done 7
> > > partitions
> > > > but as you see it's under my control). Then a consumer picks up data
> > from
> > > > that topic and writes it to the database. It's better but still it
> > takes
> > > > 3-5hours for the consumer lag to catch up.
> > > > Am I missing something fundamentally? Are there any other ideas for
> > > > optimization that can help overcome this scale challenge. Any pointer
> > and
> > > > article will help too.
> > > >
> > > > Appreciate your help with this.
> > > >
> > > > Thanks
> > >

RE: RE: RE: Maintaining same offset while migrating from Confluent Replicator to Apache Mirror Maker 2.0

2020-12-20 Thread Amit.SRIVASTAV
Hi Ning and all,

We got a crude way to solve this issue. Below are the high level steps:

Read the message from Replicator's internal topic for storing offsets. 
[connect-offsets]
This topic stores the offsets for all topics which is getting replicated in 
key:value pair . For e.g.
Key : ["replicator-group",{"topic":"TEST","partition":0}]
Value: {"offset":24}

For each topic and partition, whenever a new message is replicated a new 
message with same key but increased offset is produced to the connect-offsets 
topic.
Convert the key of this message to Mirror Maker 2 format and produce it in the 
internal topic of MirrorMaker2. [You can change the internal topics in the 
mirrormaker2-connect-distributed.properties file] The format for mirror maker 
internal topic is:
Key: ["mirrormaker-group",{"cluster":"","partition":0,"topic":"TEST"}]
Value: {"offset":24}

After posting the message, once the mirror maker is restarted, it will read the 
internal topic to get the latest offset of that topic for which the message has 
to be replicated and this way we can ensure no duplicate messages are 
replicated.

This has been tested and found to be working as expected.

Thanks and regards,
Amit

-Original Message-
From: Ning Zhang 
Sent: Thursday, December 10, 2020 10:40 PM
To: users@kafka.apache.org
Subject: Re: RE: RE: Maintaining same offset while migrating from Confluent 
Replicator to Apache Mirror Maker 2.0

[External]


regarding consumer group = null, 
https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Ftrunk%2Fconnect%2Fmirror%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fconnect%2Fmirror%2FMirrorSourceTask.java%23L89&data=04%7C01%7CAmit.SRIVASTAV%40cognizant.com%7Cb0848664644949d7bee308d89d2e63bf%7Cde08c40719b9427d9fe8edf254300ca7%7C0%7C0%7C637432169970427000%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=5D6J4NrLyYClJ3Pi7zNgU7FeQOCTp%2FmVB63iD0EzE6U%3D&reserved=0
 is where the consumer is created in MirrorSourceTask and people could override 
any consumer-level config (including group.id) at 
https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fblob%2Fb44d32dffedb368b888e3431257d68abb1e62b9f%2Fconnect%2Fmirror%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fkafka%2Fconnect%2Fmirror%2FMirrorConnectorConfig.java%23L249&data=04%7C01%7CAmit.SRIVASTAV%40cognizant.com%7Cb0848664644949d7bee308d89d2e63bf%7Cde08c40719b9427d9fe8edf254300ca7%7C0%7C0%7C637432169970436989%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=TnbS7CgaO%2BTr2KrfU8ACbomcVrao1j77flHMRufBir0%3D&reserved=0

as you may have tried something like `source.consumer.group.id`, 
`.consumer.group.id`, or 
`.group.id`, if all of them do not work, then we should 
look into more source code and see if your setting is override by other places

On 2020/12/08 06:28:50,  wrote:
> Hi Ning,
>
>
>
> It did not worked. Here are the logs from the replicator and mirror maker 2 
> respectively:
>
> Replicator:
>
> [2020-12-08 05:11:06,611] INFO [Consumer clientId=onprem-aws-replicator-0, 
> groupId=onprem-aws-replicator] Seeking to offset 83 for partition 
> ONPREM.AWS.REPLICA.TOPIC.P3R3-0 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1545)
>
> [2020-12-08 05:11:07,113] INFO [Consumer clientId=onprem-aws-replicator-0, 
> groupId=onprem-aws-replicator] Seeking to offset 49 for partition 
> ONPREM.AWS.REPLICA.TOPIC.P3R3-1 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1545)
>
> [2020-12-08 05:11:07,615] INFO [Consumer clientId=onprem-aws-replicator-0, 
> groupId=onprem-aws-replicator] Seeking to offset 53 for partition 
> ONPREM.AWS.REPLICA.TOPIC.P3R3-2 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1545)
>
>
>
> Mirror Maker 2.0:
>
> [2020-12-08 06:10:51,385] INFO [Consumer clientId=consumer-4, groupId=null] 
> Seeking to offset 80 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1545)
>
> [2020-12-08 06:10:51.385] INFO [Consumer clientId= consumer-4, groupId=null] 
> Seeking to offset 49 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1545)
>
> [2020-12-08 06:10:51.386] INFO [Consumer clientId= consumer-4, groupId=null] 
> Seeking to offset 52 for partition ONPREM.AWS.REPLICA.TOPIC.P3R3-0 
> (org.apache.kafka.clients.consumer.KafkaConsumer:1545)
>
>
>
> You can see that groupId is still null in MM2 and the offsets are previous 
> offset meaning it will replicate those messages as well which has been 
> already replicated by Replicator.
>
>
>
> Thanks and regards,
>
> Amit
>
>
>
> -Original Message-
> From: Ning Zhang 
> Sent: Monday, December 7, 2020 10:29 PM
> To: users@kafka.apache.org
> Subject: Re: RE: Maintaining same offset while migrating from Confluent 
> Replicator to Apache Mirror Maker 2.0
>
>
>
> [External]
>
>
>
>
>
> Hi Amit,
>
>
>
> Aft

Re: Kafka Scaling Ideas

2020-12-20 Thread Yana K
So as the next step I see to increase the partition of the 2nd topic - do I
increase the instances of the consumer from that or keep it at 7?
Anything else (besides researching those libs)?

Are there any good tools for load testing kafka?

On Sun, Dec 20, 2020 at 7:23 PM Haruki Okada  wrote:

> It depends on how you manually commit offsets.
> Auto-commit does commits offsets in async manner basically, so as long as
> you do manual-commit in the same way,  there should be no much difference.
>
> And, generally offset-commit mode doesn't make much difference in
> performance regardless manual/auto or async/sync unless offset-commit
> latency takes significant amount in processing time (e.g. you commit
> offsets synchronously in every poll() loop).
>
> 2020年12月21日(月) 11:08 Yana K :
>
> > Thank you so much Marina and Haruka.
> >
> > Marina's response:
> > - When you say " if you are sure there is no room for perf optimization
> of
> > the processing itself :" - do you mean code level optimizations? Can you
> > please explain?
> > - On the second topic you say " I'd say at least 40" - is this based on
> 12
> > million records / hour?
> > -  "if you can change the incoming topic" - I don't think it is possible
> :(
> > -  "you could artificially achieve the same by adding one more step
> > (service) in your pipeline" - this is the next thing - but I want to be
> > sure this will help, given we've to maintain one more layer
> >
> > Haruka's response:
> > - "One possible solution is creating an intermediate topic" - I already
> did
> > it
> > - I'll look at Decaton - thx
> >
> > Is there any thoughts on the auto commit vs manual commit - if it can
> > better the performance while consuming?
> >
> > Yana
> >
> >
> >
> > On Sat, Dec 19, 2020 at 7:01 PM Haruki Okada 
> wrote:
> >
> > > Hi.
> > >
> > > Yeah, Spring-Kafka does processing messages sequentially, so the
> consumer
> > > throughput would be capped by database latency per single process.
> > > One possible solution is creating an intermediate topic (or altering
> > source
> > > topic) with much more partitions as Marina suggested.
> > >
> > > I'd like to suggest another solution, that is multi-threaded processing
> > per
> > > single partition.
> > > Decaton (https://github.com/line/decaton) is a library to achieve it.
> > >
> > > Also confluent has published a blog post about parallel-consumer (
> > >
> > >
> >
> https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/
> > > )
> > > for that purpose, but it seems it's still in the BETA stage.
> > >
> > > 2020年12月20日(日) 11:41 Marina Popova :
> > >
> > > > The way I see it - you can only do a few things - if you are sure
> there
> > > is
> > > > no room for perf optimization of the processing itself :
> > > > 1. speed up your processing per consumer thread: which you already
> > tried
> > > > by splitting your logic into a 2-step pipeline instead of 1-step, and
> > > > delegating the work of writing to a DB to the second step ( make sure
> > > your
> > > > second intermediate Kafka topic is created with much more partitions
> to
> > > be
> > > > able to parallelize your work much higher - I'd say at least 40)
> > > > 2. if you can change the incoming topic - I would create it with many
> > > more
> > > > partitions as well - say at least 40 or so - to parallelize your
> first
> > > step
> > > > service processing more
> > > > 3. and if you can't increase partitions for the original topic ) -
> you
> > > > could artificially achieve the same by adding one more step (service)
> > in
> > > > your pipeline that would just read data from the original 7-partition
> > > > topic1 and just push it unchanged into a new topic2 with , say 40
> > > > partitions - and then have your other services pick up from this
> topic2
> > > >
> > > >
> > > > good luck,
> > > > Marina
> > > >
> > > > Sent with ProtonMail Secure Email.
> > > >
> > > > ‐‐‐ Original Message ‐‐‐
> > > > On Saturday, December 19, 2020 6:46 PM, Yana K 
> > > > wrote:
> > > >
> > > > > Hi
> > > > >
> > > > > I am new to the Kafka world and running into this scale problem. I
> > > > thought
> > > > > of reaching out to the community if someone can help.
> > > > > So the problem is I am trying to consume from a Kafka topic that
> can
> > > > have a
> > > > > peak of 12 million messages/hour. That topic is not under my
> control
> > -
> > > it
> > > > > has 7 partitions and sending json payload.
> > > > > I have written a consumer (I've used Java and Spring-Kafka lib)
> that
> > > will
> > > > > read that data, filter it and then load it into a database. I ran
> > into
> > > a
> > > > > huge consumer lag that would take 10-12hours to catch up. I have 7
> > > > > instances of my application running to match the 7 partitions and I
> > am
> > > > > using auto commit. Then I thought of splitting the write logic to a
> > > > > separate layer. So now my architecture has a component that reads
> and
> > > > > filters and produc