Re: I want to subscribe

2024-06-07 Thread Matthias J. Sax

Follow instruction from the webpage: https://kafka.apache.org/contact

On 6/4/24 9:01 AM, outof2...@sina.com wrote:

I want to subscribe


Re: kindly add me in community

2024-05-27 Thread Matthias J. Sax
Mailing list subscription is self-service. Please follow the instruction 
from the web-page: https://kafka.apache.org/contact


-Matthias

On 5/21/24 2:00 AM, Prashant Lohia wrote:

thanks
prashant lohia
prash...@gsl.in



Re: outerjoin not joining after window

2024-05-22 Thread Matthias J. Sax

Can someone confirm that each

partition has its own stream time and that the stream time for a partition
only advances when a record is written to the partition after the window
closes?


That's correct.



On 5/21/24 10:11 AM, Chad Preisler wrote:

After reviewing the logs, I think I understand what happens with the
repartition topics. Looks like they will be assigned to one or more
instances. In my example I ran three instances of the application (A, B,
C). Looks like the two repartition topics got assigned to A and B. The six
partitions from the input topics got split evenly across all three running
instances A, B, and C. Since the repartitioned streams are what I'm joining
on, I guess the join will run on two instances, and any input topic
processing will run across all three. Is that correct?

Still would like clarification regarding some records appearing to not get
processed: I think the issue is related to certain partitions not getting
records to advance stream time (because of low volume). Can someone confirm
that each partition has its own stream time and that the stream time for a
partition only advances when a record is written to the partition after the
window closes?

On Tue, May 21, 2024 at 10:27 AM Chad Preisler 
wrote:


See one small edit below...

On Tue, May 21, 2024 at 10:25 AM Chad Preisler 
wrote:


Hello,

I think the issue is related to certain partitions not getting records to
advance stream time (because of low volume). Can someone confirm that each
partition has its own stream time and that the stream time for a partition
only advances when a record is written to the partition after the window
closes?

If I use the repartition method on each input topic to reduce the number
of partitions for those streams, how many instances of the application will
process records? For example, if the input topics each have 6 partitions,
and I use the repartition method to set the number of partitions for the
streams to 2, how many instances of the application will process records?

Thanks,
Chad


On Wed, May 1, 2024 at 6:47 PM Matthias J. Sax  wrote:


How do you know this?

First thing we do is write a log message in the value joiner. We

don't see

the log message for the missed records.


Well, for left/right join results, the ValueJoiner would only be called
when the window is closed... And for invalid input (or late record, ie,
which arrive out-of-order and their window was already closes), records
would be dropped right away. So you cannot really infer that a record
did make it into the join or not, or what happens if it did make it into
the `Processor`.

-> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

`dropped-records-total` is the name of the metric.



-Matthias



On 5/1/24 11:35 AM, Chad Preisler wrote:

Hello,

We did some testing in our test environment today. We are seeing some
records processes where only one side of the join has a record. So

that's

good. However, we are still seeing some records get skipped. They

never hit

the value joiner (we write a log message first thing in the value

joiner).

During the test we were putting some load on the system, so stream

time was

advancing. We did notice that the join windows were taking much longer

than

30 minutes to close and process records. Thirty minutes is the window

plus

grace.


How do you know this?

First thing we do is write a log message in the value joiner. We don't

see

the log message for the missed records.

I will try pushing the same records locally. However, we don't see any
errors in our logs and the stream does process one sided joins after

the

skipped record. Do you have any docs on the "dropper records" metric?

I did

a Google search and didn't find many good results for that.

Thanks,

Chad

On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax 

wrote:



Thanks for the information. I ran the code using Kafka locally.

After

submitting some records inside and outside of the time window and

grace,

the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are
considers malformed and dropped? Are you using the same records in
production and in your local test?



Are there any settings for the stream client that would affect the

join?


Not that I can think of... There is one more internal config, but as
long as data is flowing, it should not impact the result you see.



Are there any settings on the broker side that would affect the

join?


No. The join is computed client side. Broker configs should not have

any

impact.


f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but

it

does not do any logging except WARN for the already mention

Re: Request to be added to kafka contributors list

2024-05-21 Thread Matthias J. Sax

Ok. Hopefully it's working now. Sorry for the hiccup.

-Matthias

On 5/21/24 1:14 AM, Fan Yang wrote:

Hi Matthia,

I tried sign out and sign in, still can't find the "Assign" button, my JIRA ID 
is fanyan, could you help me set it again?

Best,
Fan

____
From: Matthias J. Sax 
Sent: Saturday, May 18, 2024 4:06
To: users@kafka.apache.org 
Subject: Re: 回复: Request to be added to kafka contributors list

Did you sign out and sign in again?

On 5/17/24 9:49 AM, Yang Fan wrote:

Thanks Matthias,

I still can't find "Assign to me" button beside Assignee and Reporter. Could 
you help me set it again?

Best regards,
Fan
________
发件人: Matthias J. Sax 
发送时间: 2024年5月17日 2:24
收件人: users@kafka.apache.org 
主题: Re: Request to be added to kafka contributors list

Thanks for reaching out Yang. You should be all set.

-Matthias

On 5/16/24 7:40 AM, Yang Fan wrote:

Dear Apache Kafka Team,

I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I 
kindly request to be added to the contributors list for Apache Kafka. Being 
part of this list would allow me to be assigned to JIRA tickets and work on 
them.
Thank you for considering my request.
Best regards,
Fan


Re: Fwd: Request to be added to kafka contributors list

2024-05-20 Thread Matthias J. Sax

Done. You should be all set :)


-Matthias

On 5/20/24 10:10 AM, bou...@ulukai.net wrote:


Dear Apache Kafka Team,

     I hope to post in the right place: my name is Franck LEDAY, under 
Apache-Jira ID "handfreezer".


     I opened an issue as Improvement KAFKA-16707 but I failed to 
assigned it to me.


     May I ask to be added to the contributors list for Apache Kafka? As 
I already did the job of improvement, and would like to be assigned on 
to end my contribution.


Thank you for considering my request.
Best regards, Franck.


Re: Request for contributor list

2024-05-20 Thread Matthias J. Sax

What is your Jira ID?

-Matthias


On 5/20/24 9:55 AM, Brenden Deluna wrote:

Hello, I am requesting to be added to the contributor list to take care of
some tickets. Thank you.



Re: Release plan required

2024-05-20 Thread Matthias J. Sax
Zookeeper is already deprecated (since 3.5): 
https://kafka.apache.org/documentation/#zk_depr


It's planned to be fully removed in 4.0 release.

It's not confirmed yet, but there is a high probability that there won't 
be a 3.9 release, and that 4.0 will follow 3.8.



-Matthias


On 5/20/24 2:11 AM, Sahil Sharma D wrote:

Hello,

When Zookeeper is planned to depreciated from kafka, in which release this 
depreciation is planned?

Regards,
Sahil

-Original Message-
From: Sanskar Jhajharia 
Sent: Monday, May 20, 2024 1:38 PM
To: users@kafka.apache.org
Subject: Re: Release plan required

[You don't often get email from sjhajha...@confluent.io.invalid. Learn why this 
is important at https://aka.ms/LearnAboutSenderIdentification ]

Hey Sahil,

You can find the complete details of the releases and bug fix releases
here: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

The next release in Pipeline currently is 3.8.0 ( 
https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0).
There is al.o a bugfix release 3.7.1 scheduled here (
https://cwiki.apache.org/confluence/display/KAFKA/Release+plan+3.7.1)

Hope that answers your question!
Cheers.

Sanskar Jhajharia
Software Engineer I
E-mail: Personal  | Official 


On Mon, May 20, 2024 at 1:31 PM Sahil Sharma D 
 wrote:


Hi team,

We need the Kafka release plan for our Kafka upgrade planning, kindly
share the latest Release Plan or when is the next release is planned
and which version?

Regards,
Sahil



Re: 回复: Request to be added to kafka contributors list

2024-05-17 Thread Matthias J. Sax

Did you sign out and sign in again?

On 5/17/24 9:49 AM, Yang Fan wrote:

Thanks Matthias,

I still can't find "Assign to me" button beside Assignee and Reporter. Could 
you help me set it again?

Best regards,
Fan

发件人: Matthias J. Sax 
发送时间: 2024年5月17日 2:24
收件人: users@kafka.apache.org 
主题: Re: Request to be added to kafka contributors list

Thanks for reaching out Yang. You should be all set.

-Matthias

On 5/16/24 7:40 AM, Yang Fan wrote:

Dear Apache Kafka Team,

I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I 
kindly request to be added to the contributors list for Apache Kafka. Being 
part of this list would allow me to be assigned to JIRA tickets and work on 
them.
Thank you for considering my request.
Best regards,
Fan


Re: Kafka streams stores key in multiple state store instances

2024-05-16 Thread Matthias J. Sax

Hello Kay,

What you describe is "by design" -- unfortunately.

The problem is, that when we build the `Topology` we don't know the 
partition count of the input topics, and thus, StreamsBuilder cannot 
insert a repartition topic for this case (we always assume that the 
partition count is the same for all input topic).


To work around this, you would need to rewrite the program to use either 
`groupBy((k,v) -> k)` instead of `groupByKey()`, or do a 
`.repartition().groupByKey()`.


Does this make sense?


-Matthias


On 5/16/24 2:11 AM, Kay Hannay wrote:

Hi,

we have a Kafka streams application which merges (merge, groupByKey, 
aggretgate) a few topics into one topic. The application is stateful, of 
course. There are currently six instances of the application running in 
parallel.

We had an issue where one new Topic for aggregation did have another partition 
count than all other topics. This caused data corruption in our application. We 
expected that a re-partitioning topic would be created automatically by Kafka 
streams or that we would get an error. But this did not happen. Instead, some 
of the keys (all merged topics share the same key schema) found their way into 
at least two different instances of the application. One key is available in 
more than one local state store. Can you explain why this happened? As already 
said, we would have expected to get an error or a re-partitioning topic in this 
case.

Cheers
Kay



Re: Request to be added to kafka contributors list

2024-05-16 Thread Matthias J. Sax

Thanks for reaching out Yang. You should be all set.

-Matthias

On 5/16/24 7:40 AM, Yang Fan wrote:

Dear Apache Kafka Team,

I hope this email finds you well. My name is Fan Yang, JIRA ID is fanyan, I 
kindly request to be added to the contributors list for Apache Kafka. Being 
part of this list would allow me to be assigned to JIRA tickets and work on 
them.
Thank you for considering my request.
Best regards,
Fan


Re: Query regarding groupbykey in streams

2024-05-15 Thread Matthias J. Sax
If I read this correctly, your upstream producer which writes into the 
input topic of you KS app is using a custom partitioner?


If you do a `groupByKey()` and change the key upstream, it would result 
in a repartition step, which would fall back to the default partioner.


If you want to use a custom partitioner in KS, you should implement 
`StreamPartitioner` instead of the producer partitioner interface, and 
pass it into the relevant methods.


`groupByKey()` does not allow to set a partitioner (seem this is a gap 
we should close...) -- as a workaround you could add repartition() 
before the grouping to pass your custom partitioner.


For IQ, you should also need to pass your `StreamsPartitioner` to allow 
KS to fine the correct partition to query.


HTH.

-Matthias


On 5/13/24 4:35 PM, Dev Lover wrote:

Hi All,

I have a custom partitioner to distribute the data across partitions in my
cluster.

My setup looks like below
Version - 3.7.0
Kafka - 3 broker setup
Partition count - 10
Stream server pods - 2
Stream threads in each pod - 10
Deployed in Kubernetes
Custom partitioner on producer end.

I am doing a groupbykey . Is it correct to use it when I have custom
partitioner on producer end ?
I recently migrated to 3.7 from 3.5.1 . I am observing that partitions are
not evenly distributed across my 2 stream pods. Also my remote query is
failing with host being unavailable. But if I restart streams it works fine
for a certain time and again starts erroring out. Am I doing something
wrong?

Regards



Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-05-03 Thread Matthias J. Sax

Can you file a ticket for it: https://issues.apache.org/jira/browse/KAFKA



On 5/3/24 3:34 AM, Penumarthi Durga Prasad Chowdary wrote:

Kafka versions 3.5.1 and 3.7.0, we're still encountering persistent issues.
The Kafka Streams library is aligned with these Kafka versions. Upon
analysis of the logs, it seems that the problem may occur when a Kafka node
disconnects from Kafka Streams processes. This suspicion is supported by
the abundance of network messages indicating disconnections, such as


  org.apache.kafka.clients.NetworkClient
ThreadName: 
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
   Message: [Consumer
clientId=kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9-consumer,
groupId=kafka-streams-exec-0-test-store ] Node 102 disconnected.




On Mon, Apr 22, 2024 at 7:16 AM Matthias J. Sax  wrote:


Not sure either, but it sounds like a bug to me. Can you reproduce this
reliably? What version are you using?

It would be best if you could file a Jira ticket and we can take it from
there.


-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing

below

error
ThreadName:


kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9

TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
   Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
at


org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)

at


org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)

at


org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)

at


org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)

at


org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)

at


org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)

at


org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)

at


org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)

   Caused by: java.lang.NullPointerException
at


org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)

at


org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)

at


org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)

... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this

leads

to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
streams.setUncaughtExceptionHandler(throwable -> {
LOGGER.error("Exception in streams", throwable);
return


StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;

});
I'm uncertain about the exact reason for this issue. Everything seems to

be

in order, including the Kafka cluster, and there are no errors in the

Kafka

Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance

provided.






Re: outerjoin not joining after window

2024-05-01 Thread Matthias J. Sax

How do you know this?

First thing we do is write a log message in the value joiner. We don't see
the log message for the missed records.


Well, for left/right join results, the ValueJoiner would only be called 
when the window is closed... And for invalid input (or late record, ie, 
which arrive out-of-order and their window was already closes), records 
would be dropped right away. So you cannot really infer that a record 
did make it into the join or not, or what happens if it did make it into 
the `Processor`.


-> https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

`dropped-records-total` is the name of the metric.



-Matthias



On 5/1/24 11:35 AM, Chad Preisler wrote:

Hello,

We did some testing in our test environment today. We are seeing some
records processes where only one side of the join has a record. So that's
good. However, we are still seeing some records get skipped. They never hit
the value joiner (we write a log message first thing in the value joiner).
During the test we were putting some load on the system, so stream time was
advancing. We did notice that the join windows were taking much longer than
30 minutes to close and process records. Thirty minutes is the window plus
grace.


How do you know this?

First thing we do is write a log message in the value joiner. We don't see
the log message for the missed records.

I will try pushing the same records locally. However, we don't see any
errors in our logs and the stream does process one sided joins after the
skipped record. Do you have any docs on the "dropper records" metric? I did
a Google search and didn't find many good results for that.

Thanks,

Chad

On Tue, Apr 30, 2024 at 2:49 PM Matthias J. Sax  wrote:


Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are
considers malformed and dropped? Are you using the same records in
production and in your local test?



Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as
long as data is flowing, it should not impact the result you see.



Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any
impact.


f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it
does not do any logging except WARN for the already mentioned "dropping
malformed" records that is also recorded via JMX.


WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce
an output, but does not, maybe you can try to feed it into your local
test env and try to re-produce the issue?


-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and

there

has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on

our

actual servers. If I increase the log level for the streams API would

that

shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax 

wrote:



I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of "KSTREAM_OUTERSHARED". I see a rec

Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.


That gives some hope :)




However, they never get into the join.


How do you know this?


Did you check the metric for dropper records? Maybe records are 
considers malformed and dropped? Are you using the same records in 
production and in your local test?




Are there any settings for the stream client that would affect the join?


Not that I can think of... There is one more internal config, but as 
long as data is flowing, it should not impact the result you see.




Are there any settings on the broker side that would affect the join?


No. The join is computed client side. Broker configs should not have any 
impact.



f I increase the log level for the streams API would that

shed some light on what is happening?


I don't think it would help much. The code in question is 
org.apache.kafka.streams.kstream.internals.KStreamKStreamJoin -- but it 
does not do any logging except WARN for the already mentioned "dropping 
malformed" records that is also recorded via JMX.



WARN: "Skipping record due to null key or value. "



If you can identify a specific record from the input which would produce 
an output, but does not, maybe you can try to feed it into your local 
test env and try to re-produce the issue?



-Matthias

On 4/30/24 11:38 AM, Chad Preisler wrote:

Matthias,

Thanks for the information. I ran the code using Kafka locally. After
submitting some records inside and outside of the time window and grace,
the join performed as expected when running locally.

I'm not sure why the join is not working as expected when running against
our actual brokers. We are peeking at the records for the streams and we
are seeing the records get pulled. However, they never get into the join.
It's been over 24 hours since the expected records were created, and there
has been plenty of traffic to advance the stream time. Only records that
have both a left and right side match are getting processed by the join.

Are there any settings for the stream client that would affect the join?

Are there any settings on the broker side that would affect the join?

The outer join is just one part of the topology. Compared to running it
locally there is a lot more data going through the app when running on our
actual servers. If I increase the log level for the streams API would that
shed some light on what is happening? Does anyone know if there are
specific packages that I should increase the log level for? Any specific
log message I can hone in on to tell me what is going on?

Basically, I'm looking for some pointers on where I can start looking.

Thanks,
Chad


On Tue, Apr 30, 2024 at 10:26 AM Matthias J. Sax  wrote:


I expect the join to

execute after the 25 with one side of the join containing a record and

the

other being null


Given that you also have a grace period of 5 minutes, the result will
only be emitted after the grace-period passed and the window is closed
(not when window end time is reached).


One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but

I'm

not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be
emitted as left/right join result, ie, if there is no inner join result.
The format used is internal:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java

Also note: time is based on event-time, ie, if the input stream stops to
send new records, "stream-time" will stop to advance and the result
might not be emitted because the window does not get closed.

(Last, there is some internal wall-clock time delay of one second to
emit results for performance reasons...)

HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25

minutes

and 5 minutes of grace.  When I get a record for one side of the join,

but

don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and

the

other being null. Is that correct?  If it is correct, it's not working

for

me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see

when

the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot

this

issue?

Thanks,
Chad







Re: outerjoin not joining after window

2024-04-30 Thread Matthias J. Sax

I expect the join to

execute after the 25 with one side of the join containing a record and the
other being null


Given that you also have a grace period of 5 minutes, the result will 
only be emitted after the grace-period passed and the window is closed 
(not when window end time is reached).



One has a

naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages?


It's an internal store, that stores all records which are subject to be 
emitted as left/right join result, ie, if there is no inner join result. 
The format used is internal: 
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValueSerde.java


Also note: time is based on event-time, ie, if the input stream stops to 
send new records, "stream-time" will stop to advance and the result 
might not be emitted because the window does not get closed.


(Last, there is some internal wall-clock time delay of one second to 
emit results for performance reasons...)


HTH.

-Matthias

On 4/30/24 6:51 AM, Chad Preisler wrote:

Hello,

I have a KStream to KStream outer join with a time difference of 25 minutes
and 5 minutes of grace.  When I get a record for one side of the join, but
don't get a record on the other side of the join, I expect the join to
execute after the 25 with one side of the join containing a record and the
other being null. Is that correct?  If it is correct, it's not working for
me.

I was poking around on the broker and saw some internal topics. I see the
key I expected to execute the join on some of those topics. One has a
naming convention of "KSTREAM_OUTERSHARED". I see a record there, but I'm
not sure how to decode that message to see what is in it. What is the
purpose of those messages? If I decode the message will it help me see when
the join should have been executed?

I also see the key on a topic with the naming convention
"KSTREAM_OUTERTHIS".

Are there any other topics that I should be looking at to troubleshoot this
issue?

Thanks,
Chad



Re: How to find out the end of the session window

2024-04-29 Thread Matthias J. Sax

Did you look into .windowedBy(...).emitStrategy(...) ?

Using emit-final you would get an downstream even only after the window 
closed.


-Matthias

On 4/29/24 1:43 AM, Santhoshi Mekala wrote:

Hi Team,

We have the below requirement:
We are processing batch logs in kstreams. Currently, we are storing the
batch logs in kafka topic after processing. We would like to integrate with
object storage to store the batch logs in object storage after processing.
For batch logs, we are using Session windows. We would like to emit a
special event when the session window is closed and based on that event, we
will aggregate all the logs related to a key and will send it to the object
storage. For this, we need to know the end of the session window. Could you
please let me know if there are ways to identify the end of the session
windows and emit a special event.

Thank you!



Re: [ANNOUNCE] New committer: Igor Soarez

2024-04-24 Thread Matthias J. Sax

Congrats!

On 4/24/24 2:29 PM, Bill Bejeck wrote:

Congrats Igor!

-Bill

On Wed, Apr 24, 2024 at 2:37 PM Tom Bentley  wrote:


Congratulations Igor!

On Thu, 25 Apr 2024 at 6:27 AM, Chia-Ping Tsai  wrote:


Congratulations, Igor! you are one of the best Kafka developers!!!

Mickael Maison  於 2024年4月25日 週四 上午2:16寫道:


Congratulations Igor!

On Wed, Apr 24, 2024 at 8:06 PM Colin McCabe 

wrote:


Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka committer,

Igor Soarez.


Igor has been a Kafka contributor since 2019. In addition to being a

regular contributor and reviewer, he has made significant contributions

to

improving Kafka's JBOD support in KRaft mode. He has also contributed

to

discussing and reviewing many KIPs such as KIP-690, KIP-554, KIP-866,

and

KIP-938.


Congratulations, Igor!

Thanks,

Colin (on behalf of the Apache Kafka PMC)










Re: Failed to initialize processor KSTREAM-AGGREGATE-0000000001

2024-04-21 Thread Matthias J. Sax
Not sure either, but it sounds like a bug to me. Can you reproduce this 
reliably? What version are you using?


It would be best if you could file a Jira ticket and we can take it from 
there.



-Matthias

On 4/21/24 5:38 PM, Penumarthi Durga Prasad Chowdary wrote:

Hi ,
I have an issue in kafka-streams while constructing kafka-streams state
store  windows(TimeWindow and SessionWindow). While kafka-streams
processing data sometimes intermittent kafka-streams process throwing below
error
ThreadName:
kafka-streams-exec-0-test-store-6d676cf0-3910-4c25-bfad-ea2b98953db3-StreamThread-9
TraceID: unknown  CorelationID: eff36722-1430-4ffb-bf2e-c6e6cf6ae164
  Message: stream-client [ kafka-streams-exec-0-test-store
-6d676cf0-3910-4c25-bfad-ea2b98953db3] Replacing thread in the streams
uncaught exception handler
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor KSTREAM-AGGREGATE-01
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:115)
   at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:986)
   at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:271)
   at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:716)
   at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:901)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:778)
   at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
   at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
  Caused by: java.lang.NullPointerException
   at
org.apache.kafka.streams.kstream.internals.TimestampedTupleForwarder.(TimestampedTupleForwarder.java:46)
   at
org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate$KStreamSessionWindowAggregateProcessor.init(KStreamSessionWindowAggregate.java:138)
   at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:107)
   ... 7 more
Here my understanding is state-store is null and at that time
stateStore.flush() gets invoked to send the data to stateStore, this leads
to the above error. This error can be caught inside kafka-streams
setUncaughtExceptionHandler.
   streams.setUncaughtExceptionHandler(throwable -> {
   LOGGER.error("Exception in streams", throwable);
   return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
   });
I'm uncertain about the exact reason for this issue. Everything seems to be
in order, including the Kafka cluster, and there are no errors in the Kafka
Streams except for a few logs indicating node disconnections.
Is there a better way to handle this error?
When can this issue happen ?
I would like to express my gratitude in advance for any assistance provided.


Re: Streams group final result: EmitStrategy vs Suppressed

2024-04-18 Thread Matthias J. Sax
The main difference is the internal implementation. Semantically, both 
are equivalent.


suppress() uses an in-memory buffer, while `emitStrategy()` does not, 
but modifies the upstream aggregation operator impl, and waits to send 
results downstream, and thus, it's RocksDB based.



-Matthias


On 4/12/24 10:37 AM, Ayoub wrote:

Hello,

*[Not sure if my email went through as I was not subscribed to this mailing
list. Here is my original email]*

I found that there are two ways to send only the final result of a windowed
groupBy, either using Suppressed
.untilWindowCloses
on the final KTable or EmitStrategy

on
the windowed stream.

I tried to compare both but didn't find differences in the result they give.

Are there any differences apart from the moment they are defined within the
pipeline. And is there any preference on using one or the other ?

Thanks,
Ayoub


Le ven. 12 avr. 2024 à 11:50, Ayoub  a écrit :


Hello,

I found that there are two ways to send only the final result of a
windowed groupBy, either using Suppressed
.untilWindowCloses
on the final KTable or EmitStrategy

 on
the windowed stream.

I tried to compare both but didn't find differences in the result they
give.

Are there any differences apart from the moment they are defined within
the pipeline. And Is there any preference on using one or the other ?

Thanks,
Ayoub





Re: Is there any recommendation about header max size?

2024-04-18 Thread Matthias J. Sax
I don't think that there is any specific recommendation. However, there 
is an overall max-message-size config that you need to keep in mind.


-Matthias

On 4/16/24 9:42 AM, Gabriel Giussi wrote:

I have logic in my service to capture exceptions being thrown during
message processing and produce a new message to a different topic with
information about the error. The idea is to leave the message unmodified,
aka produce the exact same bytes to this new topic, therefore I'm planning
on adding the java exception as a header.
By looking at the documentation it is just an array of bytes and it doesn't
say anything about a max size but is there any recommendation about it?
https://kafka.apache.org/documentation/#recordheader



Re: [ANNOUNCE] New Kafka PMC Member: Greg Harris

2024-04-18 Thread Matthias J. Sax

Congrats Greg!

On 4/15/24 10:44 AM, Hector Geraldino (BLOOMBERG/ 919 3RD A) wrote:

Congrats! Well deserved

From: d...@kafka.apache.org At: 04/13/24 14:42:22 UTC-4:00To:  
d...@kafka.apache.org
Subject: [ANNOUNCE] New Kafka PMC Member: Greg Harris

Hi all,

Greg Harris has been a Kafka committer since July 2023. He has remained
very active and instructive in the community since becoming a committer.
It's my pleasure to announce that Greg is now a member of Kafka PMC.

Congratulations, Greg!

Chris, on behalf of the Apache Kafka PMC




Re: Fix slow processing rate in Kafka streams

2024-04-05 Thread Matthias J. Sax

Perf tuning is always tricky... 350 rec/sec sounds pretty low though.

You would first need to figure out where the bottleneck is. Kafka 
Streams exposes all kind of metrics: 
https://kafka.apache.org/documentation/#kafka_streams_monitoring


Might be good to inspect them as a first step -- maybe something is off 
and gives a first direction.


In general, it would be good to limit it to Kafka network I/O, local 
RocksDB disk I/O, or CPU utilization -- each one could be the bottleneck 
and we would need to first know which one before you can take any action 
to change configurations.


HTH.

-Matthias

On 4/4/24 7:21 PM, Nirmal Das wrote:

Hi All,

My streams application is not processing more than 350 records/sec on a
high load of 3milliom records produced every 2-3 minutes.

My scenarios are as below -
I am on Kafka and streams version of 3.5.1 .
My key-value pair is in protobuf format .
I do a groupbykey followed by TimeWindow of 10 mins with grace period of 6
hours . It is then followed by a aggregate function which stores the first
and last offset of the record along with partition for that message key.

Am I doing something wrong? Am I doing something anti-pattern which is
throttling the system ? How can I improve this?

Regards,
Dev Lover



Re: outerJoin confusion

2024-04-04 Thread Matthias J. Sax

Yeah, that is some quirk of KS runtime...

There is some internal config (for perf reasons) that delays emitting 
results... An alternative to advancing wall-clock time would be to set 
this internal config to zero, to disable the delay.


Maybe we should disable this config when topology test driver is used 
automatically... It's not the first time it did came up.


I opened a PR for it: https://github.com/apache/kafka/pull/15660


-Matthias



On 4/3/24 3:52 PM, Chad Preisler wrote:

Changing the code to this...

assertTrue(outputTopic.isEmpty());
 testDriver.advanceWallClockTime(Duration.ofMillis(2001));
 leftTopic.pipeInput("1", "test string 3", 4002L);
 testDriver.advanceWallClockTime(Duration.ofMillis(2001));
 leftTopic.pipeInput("1", "test string 4", 6004L);

Did appear to fix the issue. Output:

First join result:
Key: 1 Value: test string 1, null
Second join result:
Key: 1 Value: test string 2, null
Key: 1 Value: test string 3, null

Still a little strange that it works the first time without advancing the
wall clock.

On Wed, Apr 3, 2024 at 5:05 PM Shashwat Pandey <
shashwat.pandey@gmail.com> wrote:


I believe you need to advanceWallClockTime

https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/TopologyTestDriver.html#advanceWallClockTime-java.time.Duration-


Regards,
Shashwat Pandey


On Wed, Apr 3, 2024 at 5:05 PM Chad Preisler 
wrote:


Seems like there is some issue with the TopologyTestDriver. I am able to
run the same stream against Kakfa and I'm getting the output I expect.

I'd

appreciate it if someone could confirm that there is an issue with the
TopologyTestDriver. If there is, any suggestions on how to test this type
of join?

On Wed, Apr 3, 2024 at 2:46 PM Chad Preisler 
wrote:


Hello,

I'm confused about the outerJoin and when records are produced with the
following code.

Topology buildTopology() {
 var builder = new StreamsBuilder();
 var leftStream = builder.stream("leftSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));
 var rightStream = builder.stream("rightSecondsTopic",
Consumed.with(Serdes.String(), Serdes.String()));

 leftStream.outerJoin(rightStream, (left, right) -> left + ", "

+

right,

JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2000)))
 .to("outputTopicSeconds");

 return builder.build();
 }

Here is the test driver.

@Test
 public void testSecondsJoinDoesNotWork() {
 Properties props = new Properties();
 props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testSeconds");
 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
 var app = new KafkaStreamJoinTest();
 var serializer = new StringSerializer();

 try(var testDriver = new

TopologyTestDriver(app.buildTopology(),

 props)) {
 var leftTopic =
testDriver.createInputTopic("leftSecondsTopic",
 serializer, serializer, Instant.ofEpochMilli(0L),
Duration.ZERO);
 leftTopic.pipeInput("1", "test string 1", 0L);
 leftTopic.pipeInput("1", "test string 2", 2001L);

 var outputTopic =
testDriver.createOutputTopic("outputTopicSeconds",
 new StringDeserializer(), new

StringDeserializer());

 assertFalse(outputTopic.isEmpty());
 System.out.println("First join result:");
 outputTopic.readKeyValuesToList()
 .forEach((keyValue)->
 System.out.println("Key: " + keyValue.key + "

Value:

"

+ keyValue.value));

 assertTrue(outputTopic.isEmpty());

 leftTopic.pipeInput("1", "test string 3", 4002L);
 leftTopic.pipeInput("1", "test string 4", 6004L);

 System.out.println("Second join result:");
 outputTopic.readKeyValuesToList()
 .forEach((keyValue)->
 System.out.println("Key: " + keyValue.key + "

Value:

"

+ keyValue.value));

 }
 }

Here is the output:
First join result:
Key: 1 Value: test string 1, null
Second join result:

I would have expected a join to happen with "test string 2" and "test
string 3" being output with a null right value. Why didn't that happen?










Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-04-04 Thread Matthias J. Sax
/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66><https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66<https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L882C39-L882C66>>>


On 3/13/24 2:34 AM, Venkatesh Nagarajan wrote:

Just want to share another variant of the log message which is also related to 
metadata and rebalancing but has a different client reason:

INFO [GroupCoordinator 3]: Preparing to rebalance group  in state 
PreparingRebalance with old generation nnn (__consumer_offsets-nn) (reason: Updating 
metadata for member  during Stable; client reason: triggered followup 
rebalance scheduled for 0) (kafka.coordinator.group.GroupCoordinator)

Thank you.

Kind regards,
Venkatesh

From: Venkatesh Nagarajan 
Date: Wednesday, 13 March 2024 at 12:06 pm
To: users@kafka.apache.org 
Subject: Re: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Thanks very much for your important inputs, Matthias.

I will use the default METADATA_MAX_AGE_CONFIG. I set it to 5 hours when I saw 
a lot of such rebalancing related messages in the MSK broker logs:

INFO [GroupCoordinator 2]: Preparing to rebalance group  in state 
PreparingRebalance with old generation  (__consumer_offsets-nn) (reason: Updating 
metadata for member  during Stable; client reason: need to revoke partitions 
and re-join) (kafka.coordinator.group.GroupCoordinator)

I am guessing that the two are unrelated. If you have any suggestions on how to 
reduce such rebalancing, that will be very helpful.

Thank you very much.

Kind regards,
Venkatesh

From: Matthias J. Sax 
Date: Tuesday, 12 March 2024 at 1:31 pm
To: users@kafka.apache.org 
Subject: [EXTERNAL] Re: Kafka Streams 3.5.1 based app seems to get stalled
Without detailed logs (maybe even DEBUG) hard to say.

But from what you describe, it could be a metadata issue? Why are you
setting


METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)


Refreshing metadata has nothing to do with rebalances, and a metadata
refresh does not trigger a rebalance.



-Matthias


On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:

Hi all,

A Kafka Streams application sometimes stops consuming events during load 
testing. Please find below the details:

Details of the app:


* Kafka Streams Version: 3.5.1
* Kafka: AWS MSK v3.6.0
* Consumes events from 6 topics
* Calls APIs to enrich events
* Sometimes joins two streams
* Produces enriched events in output topics

Runs on AWS ECS:

* Each task has 10 streaming threads
* Autoscaling based on offset lags and a maximum of 6 ECS tasks
* Input topics have 60 partitions each to match 6 tasks * 10 threads
* Fairly good spread of events across all topic partitions using partitioning 
keys

Settings and configuration:


* At least once semantics
* MAX_POLL_RECORDS_CONFIG: 10
* APPLICATION_ID_CONFIG

// Make rebalances rare and prevent stop-the-world rebalances

* Static membership (using GROUP_INSTANCE_ID_CONFIG)
* METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)
* MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
* SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

State store related settings:

* TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
* STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
* NUM_STANDBY_REPLICAS_CONFIG: 1


Symptoms:
The symptoms mentioned below occur during load tests:

Scenario# 1:
Steady input event stream

Observations:

* Gradually increasing offset lags which shouldn't happen normally as the 
streaming app is quite fast
* Events get processed

Scenario# 2:
No input events after the load test stops producing events

Observations:

* Offset lag stuck at ~5k
* Stable consumer group
* No events processed
* No errors or messages in the logs


Scenario# 3:
Restart the app when it stops processing events although offset lags are not 
zero

Observations:

* Offset lags start reducing and events start getting processed

Scenario# 4:
Transient errors occur while processing events


* A custom exception handler that implements StreamsUncaughtExceptionHandler 
returns StreamThreadExceptionResponse.REPLA

Re: [ANNOUNCE] New committer: Christo Lolov

2024-03-27 Thread Matthias J. Sax

Congrats!

On 3/26/24 9:39 PM, Christo Lolov wrote:

Thank you everyone!

It wouldn't have been possible without quite a lot of reviews and extremely
helpful inputs from you and the rest of the community! I am looking forward
to working more closely with you going forward :)

On Tue, 26 Mar 2024 at 14:31, Kirk True  wrote:


Congratulations Christo!


On Mar 26, 2024, at 7:27 AM, Satish Duggana 

wrote:


Congratulations Christo!

On Tue, 26 Mar 2024 at 19:20, Ivan Yurchenko  wrote:


Congrats!

On Tue, Mar 26, 2024, at 14:48, Lucas Brutschy wrote:

Congrats!

On Tue, Mar 26, 2024 at 2:44 PM Federico Valeri 

wrote:


Congrats!

On Tue, Mar 26, 2024 at 2:27 PM Mickael Maison <

mickael.mai...@gmail.com> wrote:


Congratulations Christo!

On Tue, Mar 26, 2024 at 2:26 PM Chia-Ping Tsai 

wrote:


Congrats Christo!

Chia-Ping









Re: Kafka Streams 3.5.1 based app seems to get stalled

2024-03-11 Thread Matthias J. Sax

Without detailed logs (maybe even DEBUG) hard to say.

But from what you describe, it could be a metadata issue? Why are you 
setting



METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to make 
rebalances rare)


Refreshing metadata has nothing to do with rebalances, and a metadata 
refresh does not trigger a rebalance.




-Matthias


On 3/10/24 5:56 PM, Venkatesh Nagarajan wrote:

Hi all,

A Kafka Streams application sometimes stops consuming events during load 
testing. Please find below the details:

Details of the app:


   *   Kafka Streams Version: 3.5.1
   *   Kafka: AWS MSK v3.6.0
   *   Consumes events from 6 topics
   *   Calls APIs to enrich events
   *   Sometimes joins two streams
   *   Produces enriched events in output topics

Runs on AWS ECS:

   *   Each task has 10 streaming threads
   *   Autoscaling based on offset lags and a maximum of 6 ECS tasks
   *   Input topics have 60 partitions each to match 6 tasks * 10 threads
   *   Fairly good spread of events across all topic partitions using 
partitioning keys

Settings and configuration:


   *   At least once semantics
   *   MAX_POLL_RECORDS_CONFIG: 10
   *   APPLICATION_ID_CONFIG

// Make rebalances rare and prevent stop-the-world rebalances

   *   Static membership (using GROUP_INSTANCE_ID_CONFIG)
   *   METADATA_MAX_AGE_CONFIG (consumer and producer): 5 hours in millis (to 
make rebalances rare)
   *   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
   *   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

State store related settings:

   *   TOPOLOGY_OPTIMIZATION_CONFIG: OPTIMIZE
   *   STATESTORE_CACHE_MAX_BYTES_CONFIG: 300 * 1024 * 1024L
   *   NUM_STANDBY_REPLICAS_CONFIG: 1


Symptoms:
The symptoms mentioned below occur during load tests:

Scenario# 1:
Steady input event stream

Observations:

   *   Gradually increasing offset lags which shouldn't happen normally as the 
streaming app is quite fast
   *   Events get processed

Scenario# 2:
No input events after the load test stops producing events

Observations:

   *   Offset lag stuck at ~5k
   *   Stable consumer group
   *   No events processed
   *   No errors or messages in the logs


Scenario# 3:
Restart the app when it stops processing events although offset lags are not 
zero

Observations:

   *   Offset lags start reducing and events start getting processed

Scenario# 4:
Transient errors occur while processing events


   *   A custom exception handler that implements 
StreamsUncaughtExceptionHandler returns 
StreamThreadExceptionResponse.REPLACE_THREAD in the handle method
   *   If transient errors keep occurring occasionally and threads get 
replaced, the problem of the app stalling disappears.
   *   But if transient errors don't occur, the app tends to stall and I need 
to manually restart it


Summary:

   *   It appears that some streaming threads stall after processing for a 
while.
   *   It is difficult to change log level for Kafka Streams from ERROR to INFO 
as it starts producing a lot of log messages especially during load tests.
   *   I haven't yet managed to push Kafka streams metrics into AWS OTEL 
collector to get more insights.

Can you please let me know if any Kafka Streams config settings need changing? 
Should I reduce the values of any of these settings to help trigger rebalancing 
early and hence assign partitions to members that are active:


   *   METADATA_MAX_AGE_CONFIG: 5 hours in millis (to make rebalances rare)
   *   MAX_POLL_INTERVAL_MS_CONFIG: 20 minutes in millis
   *   SESSION_TIMEOUT_MS_CONFIG: 2 minutes in millis

Should I get rid of static membership – this may increase rebalancing but may 
be okay if it can prevent stalled threads from appearing as active members

Should I try upgrading Kafka Streams to v3.6.0 or v3.7.0? Hoping that v3.7.0 
will be compatible with AWS MSK v3.6.0.


Thank you very much.

Kind regards,
Venkatesh

UTS CRICOS Provider Code: 00099F DISCLAIMER: This email message and any 
accompanying attachments may contain confidential information. If you are not 
the intended recipient, do not read, use, disseminate, distribute or copy this 
message or attachments. If you have received this message in error, please 
notify the sender immediately and delete this message. Any views expressed in 
this message are those of the individual sender, except where the sender 
expressly, and with authority, states them to be the views of the University of 
Technology Sydney. Before opening any attachments, please check them for 
viruses and defects. Think. Green. Do. Please consider the environment before 
printing this email.



Re: Join request

2024-02-24 Thread Matthias J. Sax

To subscribe, please follow instructions from the webpage

https://kafka.apache.org/contact


-Matthias

On 2/23/24 1:15 AM, kashi mori wrote:

Hi, please add my email to the mailin list



Re: GlobalKTable with RocksDB - queries before state RUNNING?

2024-02-21 Thread Matthias J. Sax

Filed https://issues.apache.org/jira/browse/KAFKA-16295

Also, for global store support, we do have a ticket already: 
https://issues.apache.org/jira/browse/KAFKA-13523


It's actually a little bit more involved due to position tracking... I 
guess we might need a KIP to fix this.


And yes: if anybody has interest to pick it up, that would be great. We 
did push a couple of IQv2 improvements into upcoming 3.7 release, and of 
course hope to make it the default eventually and to deprecate IQv1.


We should actually also start to document IQv2... 
https://issues.apache.org/jira/browse/KAFKA-16262



-Matthias

On 11/21/23 4:50 PM, Sophie Blee-Goldman wrote:

Just to make sure I understand the logs, you're saying the "new file
processed" lines represent store queries, and presumably the
com.osr.serKafkaStreamsService is your service that's issuing these queries?

You need to wait for the app to finish restoring state before querying it.
Based on this message -- "KafkaStreams has not been started, you can retry
after calling start()" -- I assume you're kicking off the querying service
right away and blocking queries until after KafkaStreams#start is called.
But you need to wait for it to actually finish starting up, not just for
start() to be called. The best way to do this is by setting a state
listener via KafkaStreams#setStateListener, and then using this to listen
in on the KafkaStreams.State and blocking the queries until the state has
changed to RUNNING.

In case you're curious about why this seems to work with in-memory stores
but not with rocksdb, it seems like in the in-memory case, the queries that
are attempted during restoration are blocked due to the store being closed
(according to "(Quarkus Main Thread) the state store, store-name, is not
open.")

So why is the store closed for most of the restoration in the in-memory
case only? This gets a bit into the weeds, but it has to do with the
sequence of events in starting up a state store. When the global thread
starts up, it'll first loop over all its state stores and call #init on
them. Two things have to happen inside #init: the store is opened, and the
store registers itself with the ProcessorContext. The #register involves
various things, including a call to fetch the end offsets of the topic for
global state stores. This is a blocking call, so the store might stay
inside the #register call for a relatively long while.

For RocksDB stores, we open the store first and then call #register, so by
the time the GlobalStreamThread is sitting around waiting on the end
offsets response, the store is open and your queries are getting through to
it. However the in-memory store actually registers itself *first*, before
marking itself as open, and so it remains closed for most of the time it
spends in restoration and blocks any query attempts during this time.

I suppose it would make sense to align the two store implementations to
have the same behavior, and the in-memory store is probably technically
more correct. But in the end you really should just wait for the
KafkaStreams.State to get to RUNNING before querying the state store, as
that's the only true guarantee.

Hope this helps!

-Sophie

On Tue, Nov 21, 2023 at 6:44 AM Christian Zuegner
 wrote:


Hi,

we have the following problem - a Kafka Topic ~20Megabytes is made
available as GlobalKTable for queries. With using RocksDB the GKTable is
ready for queries instantly even without having reading the data complete -
all get() requests return null. After a few seconds the data is querieable
correctly - but this is to late for our application. Once we switch to
IN_MEMORY we get the expected behavior. The store is only ready after all
data has been read from topic.

How can we achieve the same behavior with the RocksDB setup?

Snipet to build KafkaStreams Topology

builder.globalTable(
   "topic-name",
   Consumed.with(Serdes.String(), Serdes.String()),

Materialized.as(STORE_NAME).withStoreType(Materialized.StoreType.ROCKS_DB)
);

Query the Table

while (true) {
 try {
 return streams.store(

StoreQueryParameters.fromNameAndType(FileCrawlerKafkaTopologyProducer.STORE_NAME,
QueryableStoreTypes.keyValueStore()));
 } catch (InvalidStateStoreException e) {
 logger.warn(e.getMessage());
 try {
 Thread.sleep(3000);
 } catch (InterruptedException ignored) {
 }
 }
 }

The store is queried with getStore().get(key); <- here we get the null
values.

This is the Log Output when RocksDB - first query before state RUNNING

...
2023-11-21 15:15:40,629 INFO  [com.osr.serKafkaStreamsService] (Quarkus
Main Thread) wait for kafka streams store to get ready: KafkaStreams has
not been started, you can retry after calling start()
2023-11-21 15:15:41,781 INFO  [org.apa.kaf.str.KafkaStreams]
(pool-10-thread-1) stream-client
[topic-name-7c35d436-f18c-4cb9-9d87-80855df5d1a2] State 

Re: EOS date for Kafka 3.5.1

2024-02-12 Thread Matthias J. Sax

https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan#TimeBasedReleasePlan-WhatIsOurEOLPolicy?

On 2/11/24 8:08 PM, Sahil Sharma D wrote:

Hi team,

Can you please share the EOS date for Kafka Version 3.5.1?

Regards,
Sahil



Re: Re-key by multiple properties without composite key

2024-02-07 Thread Matthias J. Sax

Using the DSL, this sounds about right.

I am not worried about the complexity -- KS can handle it, and it's not 
uncommon to end up with such topologies.


You might be able to cut down on complexity by not using the DSL, but 
the Processor API. It gives you more control, and thus you might be able 
to optimize the overall topology.


Maybe inspect the details of `TopologyDescription` to spot 
inefficiencies of the DSL generated Topology that might give you an idea 
how much you could optimize using Processor API (to estimate if it would 
be worth the effort).


It's hard to tell w/o knowing the details. It could also be just an 
inherently complex problem, and the DSL program is already as efficient 
as it gets...


Of course, there might also be ways to play with configs to cut down on 
latency to some extend, if e2e latency is your main concern. Again, I 
don't know the use case: for many case, sub-second latency is actually 
sufficient.


HTH.

-Matthias

On 2/7/24 7:41 AM, Karsten Stöckmann wrote:

Sorry for being late with the response - I've been quite busy working
on our Streams application lately.

That leads me back to my initial question. The Folder class contains
multiple fields with FK pointing to the Person table, all of them with
different semantics (customer, billing address, etc). So in order to
find _all_ folders related to a particular person regardless of its
role, I guess I need to

a) re-key the folder table on each person FK independently and then
b) outer join the result tables.

The entire topology is insanely complex, I've got around 10 tables
with different levels of nesting (e.g. folder -- 1:n --> dependency a
-- 1:n --> dependency b) that all need to be aggregated and in the end
re-keyed to person IDs in order to build an aggregate person. There
are 14 sub topologies... - measuring the e2e latency shows values
around 600ms which seems rather high to me. Does that sound crazy? ;)

Best wishes
Karsten

Am Do., 1. Feb. 2024 um 19:02 Uhr schrieb Matthias J. Sax :


I see. You need to ensure that you get _all_ Person.

For this case, I guess you are right. You would need to first aggregate
the folder per person:

KTable allPersonFolders =
  folder.groupBy((...) -> (folder.customerId, ...))
.aggregate(...)

And in a second step, do a left join:

result = personTable.leftJoin(allPersonFolders,...)



Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.


Not sure if I can follow. What do you mean by "crashed". -- For tables,
there is no `selectKey()` nor  a `repartition()` as explained in my
previous reply. However, doing a `table.groupBy(...)` will set a new key
and repartition the data to your needs.


-Matthias


On 2/1/24 1:12 AM, Karsten Stöckmann wrote:

Thanks so much for taking a look. An FK-table-table join is an inner
join which implies there would be no Person entites without associated
Folders. Unfortunately, that's not the case. That lead me to an
attempt of re-keying the Folder topic by each of the three possible
foreign keys in order to be able to left join Persons against each of
the three re-keyed KTables to build an eventual Person aggregation
containing all possible Folders associated in any way.

Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.

Best wishes,
Karsten

Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax :


Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic")
-- no need to so `builder.stream().toTable()`).

And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
 folderTable
 .join(personTable, (folderId, folder) -> folder.customerId, ...)
 .groupBy((...) -> (personId, ...))
 .aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that
tells the join to use `customerId` from the `folderTable` to lookup the
person from personTable.

Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
 Long id;
 String firstname;
 String lastname;
 // some content
}

class Folder {
 Long id;
 String folderNumber;
 // some other content
 Long customerId; // FK, points to Person.id
 Long billingAddressId; // FK, al

Re: Re-key by multiple properties without composite key

2024-02-01 Thread Matthias J. Sax

I see. You need to ensure that you get _all_ Person.

For this case, I guess you are right. You would need to first aggregate 
the folder per person:


KTable allPersonFolders =
folder.groupBy((...) -> (folder.customerId, ...))
  .aggregate(...)

And in a second step, do a left join:

result = personTable.leftJoin(allPersonFolders,...)



Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.


Not sure if I can follow. What do you mean by "crashed". -- For tables, 
there is no `selectKey()` nor  a `repartition()` as explained in my 
previous reply. However, doing a `table.groupBy(...)` will set a new key 
and repartition the data to your needs.



-Matthias


On 2/1/24 1:12 AM, Karsten Stöckmann wrote:

Thanks so much for taking a look. An FK-table-table join is an inner
join which implies there would be no Person entites without associated
Folders. Unfortunately, that's not the case. That lead me to an
attempt of re-keying the Folder topic by each of the three possible
foreign keys in order to be able to left join Persons against each of
the three re-keyed KTables to build an eventual Person aggregation
containing all possible Folders associated in any way.

Reading the topic as a table directly did not work out as that crashed
the application; apparently reading the topic as a KTable and then
using that for three independent re-key-operations is not allowed.

Best wishes,
Karsten

Am Do., 1. Feb. 2024 um 02:16 Uhr schrieb Matthias J. Sax :


Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic")
-- no need to so `builder.stream().toTable()`).

And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
folderTable
.join(personTable, (folderId, folder) -> folder.customerId, ...)
.groupBy((...) -> (personId, ...))
.aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that
tells the join to use `customerId` from the `folderTable` to lookup the
person from personTable.

Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
Long id;
String firstname;
String lastname;
// some content
}

class Folder {
Long id;
String folderNumber;
// some other content
Long customerId; // FK, points to Person.id
Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys of Folder point to Person entities, yet with
different semantics. They're not composite keys but act independently.

Now assume I want to build an aggregate Person object containing
Folder.folderNumber of all folders associated with a Person entity,
regardless whether it acts as a customer or billing address. My
(naive) idea was to build re-keyed KTables by Folder.customerId and
Folder.billingAddressId and then joining / aggregating them with the
Person KTable in order to build something like this:

class AggregatedPerson {
Long id;
List folderNumbers; // or even List
// ...
}

(The latter supposed to be written to an output topic in order to
serve as input for Solr or ElasticSearch.)

Does this even make sense?



If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.


So re-key means repartition? ATM the partition size of all input
topics is 1 as per Kafka UI, as I've specified no extra configuration
for them.

Best wishes,
Karsten

Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax :



Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.

You could read the topic as KStream though, and provide a custom
`StreamPartitioner` for a `repartition()` operation. However, this is
also "dangerous" because for a KStream it's also assumed that it's
partitioned by it's key, and you might break downstream DSL operators
with such a violation of the

Re: Re-key by multiple properties without composite key

2024-01-31 Thread Matthias J. Sax

Thanks for the details. This does make sense.

So it seems you can read all topic as table (ie, builder.table("topic") 
-- no need to so `builder.stream().toTable()`).


And you can use the built-in FK-table-table join, and aggregate the result:

KTable result =
  folderTable
  .join(personTable, (folderId, folder) -> folder.customerId, ...)
  .groupBy((...) -> (personId, ...))
  .aggregate(...);
result.toStream().to("resultTopic");

Note the fk-extractor `(folderId, folder) -> folder.customerId` that 
tells the join to use `customerId` from the `folderTable` to lookup the 
person from personTable.


Think of `folderTable` as fact-table and `personTable` as dimension table.


KS will take care of everything else under the hood automatically.


-Matthias

On 1/30/24 11:25 AM, Karsten Stöckmann wrote:

Matthias, thanks for getting back on this. I'll try to illustrate my
intent with an example as I'm not yet fully familiar with Kafka
(Streams) and its idioms...

Assume classes Person and Folder:

class Person {
   Long id;
   String firstname;
   String lastname;
   // some content
}

class Folder {
   Long id;
   String folderNumber;
   // some other content
   Long customerId; // FK, points to Person.id
   Long billingAddressId; // FK, also points to Person.id
}

Thus both foreign keys of Folder point to Person entities, yet with
different semantics. They're not composite keys but act independently.

Now assume I want to build an aggregate Person object containing
Folder.folderNumber of all folders associated with a Person entity,
regardless whether it acts as a customer or billing address. My
(naive) idea was to build re-keyed KTables by Folder.customerId and
Folder.billingAddressId and then joining / aggregating them with the
Person KTable in order to build something like this:

class AggregatedPerson {
   Long id;
   List folderNumbers; // or even List
   // ...
}

(The latter supposed to be written to an output topic in order to
serve as input for Solr or ElasticSearch.)

Does this even make sense?



If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.


So re-key means repartition? ATM the partition size of all input
topics is 1 as per Kafka UI, as I've specified no extra configuration
for them.

Best wishes,
Karsten

Am Di., 30. Jan. 2024 um 20:03 Uhr schrieb Matthias J. Sax :



Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it
violates the contract. A KTable must be partitioned by it's primary key,
ie, the ID field, and thus the DSL does not offer you a repartition option.

You could read the topic as KStream though, and provide a custom
`StreamPartitioner` for a `repartition()` operation. However, this is
also "dangerous" because for a KStream it's also assumed that it's
partitioned by it's key, and you might break downstream DSL operators
with such a violation of the "contract".

Looking into your solution:


.toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk1(), value),
 Grouped.with(...))


This will set fk1 as key, what seems not to align with you previous
comment about the key should stay the ID? (Same for f2k).

Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's
unclear what you try to actually do to begin with? It sound like it's
overall a self-join of the input topic on fk1 and fk2 ?


-Matthias

On 1/28/24 2:24 AM, Karsten Stöckmann wrote:

Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
  private Long id;
  private String someContent;
  private Long fk1;
  private Long fk2;
  // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream in = streamsBuilder.stream(topic, Consumed.with(...));

KTable rekeyedByFk1 = in
  .toTable()
  .groupBy(
  (key, value) -> KeyValue.pair(value.fk1(), value),
  G

Re: What does kafka streams groupBy does internally?

2024-01-30 Thread Matthias J. Sax

Did reply on SO.

-Matthias

On 1/24/24 2:18 AM, warrior2...@gmail.com wrote:
Let's say there's a topic in which chunks of different files are all 
mixed up represented by a tuple |(FileId, Chunk)|.


Chunks of a same file also can be a little out of order.

The task is to aggregate all files and store them into some store.

The number of files is unbound.

In pseudo stream DSL that might look like

|topic('chunks') .groupByKey((fileId, chunk) -> fileId) .sortBy((fileId, 
chunk) -> chunk.offset) .aggregate((fileId, chunk) -> 
store.append(fileId, chunk)); |


I want to understand whether kafka streams can solve this efficiently. 
Since the number of files is unbound how would kafka manage intermediate 
topics for groupBy operation? How many partitions will it use etc? Can't 
find this details in the docs. Also let's say chunk has a flag that 
indicates EOF. How to indicate that specific group will no longer have 
any new data?



That’s a copy of my stack overflow question.
apple-touch-i...@2.png
What does kafka streams groupBy does internally? 

stackoverflow.com 






—
Michael


Re: Re-key by multiple properties without composite key

2024-01-30 Thread Matthias J. Sax

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).


It this two independent FK, or one two-column FK?



Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2?


If you read the topic a KTable, you cannot repartition because it 
violates the contract. A KTable must be partitioned by it's primary key, 
ie, the ID field, and thus the DSL does not offer you a repartition option.


You could read the topic as KStream though, and provide a custom 
`StreamPartitioner` for a `repartition()` operation. However, this is 
also "dangerous" because for a KStream it's also assumed that it's 
partitioned by it's key, and you might break downstream DSL operators 
with such a violation of the "contract".


Looking into your solution:


.toTable()
.groupBy(
(key, value) -> KeyValue.pair(value.fk1(), value),
Grouped.with(...))


This will set fk1 as key, what seems not to align with you previous 
comment about the key should stay the ID? (Same for f2k).


Your last step seems to join fk1-fk2 -- is this on purpose? I guess it's 
unclear what you try to actually do to begin with? It sound like it's 
overall a self-join of the input topic on fk1 and fk2 ?



-Matthias

On 1/28/24 2:24 AM, Karsten Stöckmann wrote:

Hi all,

just stumbled upon another Kafka Streams issue that keeps me busy these days.

Assume a (simplified) class A like this:

class A {
 private Long id;
 private String someContent;
 private Long fk1;
 private Long fk2;
 // Getters and setters accordingly
}

Both fk1 and fk2 point to the PK of another entity (not shown for
brevity, of no relevance to the question).

Now assume a Kafka topic built from instances of class A, keyed by its
id (see above).

Ingesting the topic into a Kafka Streams application, how can I re-key
the resulting KTable by both fk1 and fk2? Note that the
resulting key should not be changed or turned into some kind of
composite key as it is used in later join operations.

My (naive) solution involves creating two KTables from the input
stream, re-keying them by fk1 and fk2 accordingly and then outer
joining both resulting (re-keyed) KTables.

KStream in = streamsBuilder.stream(topic, Consumed.with(...));

KTable rekeyedByFk1 = in
 .toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk1(), value),
 Grouped.with(...))
 .aggregate(
 Aggregate::new,
 (key, value, aggregate) -> aggregate.add(value),
 (key, value, aggregate) -> aggregate.remove(value),
 Materialized.with(...));

KTable rekeyedByFk2 = in
 .toTable()
 .groupBy(
 (key, value) -> KeyValue.pair(value.fk2(), value),
 Grouped.with(...))
 .aggregate(
 ... same as above
 );

KTable joined = rekeyedByFk1
 .outerJoin(
 rekeyedByFk2,
 )
   .groupBy(KeyValue::pair, Grouped.with(...))
 .aggregate(...);

 would integrate the (already pre-joined) Aggregates as
to avoid duplicates.

Does this seem like a viable solution, or are there better / simpler /
more efficient implementations?

Best wishes,
Karsten


Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-17 Thread Matthias J. Sax
You cannot add a `Processor`. You can only use `aggregate() / reduce() / 
count()` (which of course will add a pre-defined processor).


`groupByKey()` is really just a "meta operation" that checks if the key 
was changes upstream, and to insert a repartition/shuffle step if necessary.


Thus, if you don't change the upstream key, you can just add a processor 
to `someStream` (groupByKey() would be a no-op anyway).


If you did change the key upstream, you can do 
`someStream.repartition().transform()` to repartition explicitly.



HTH.

On 1/13/24 3:14 AM, Igor Maznitsa wrote:
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
   KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







Re: [PROPOSAL] Add commercial support page on website

2024-01-12 Thread Matthias J. Sax

François,

thanks for starting this initiative. Personally, I don't think it's 
necessarily harmful for the project to add such a new page, however, I 
share the same concerns others raised already.


I understand your motivation that people had issues finding commercial 
support, but I am not sure we can address this issue that way. I am also 
"worried" (for the lack of a better word) that the page might become 
long an unwieldy. In the end, any freelancer/consultant offering Kafka 
services would be able to get on the page, so we might get hundreds of 
entries, what also makes it impossible for users to find what they are 
looking for. Also, the services of different companies might vary 
drastically; should users read all these descriptions? I can also 
imagine that some companies offer their services only in some 
countries/regions making it even harder for user to find what they are 
looking for?


Overall, it sounds more like a search optimization problem, and thus it 
seems out-of-scope what we can solve. As I said, I am not strictly 
against it, but I just don't see much value either.



-Matthias

On 1/11/24 12:55 PM, Francois Papon wrote:

Hi Justine,

You're right, Kafka is a part of my business (training, consulting, 
architecture design, sla...) and most of the time, users/customers said 
that it was hard for them to find a commercial support (in France for my 
case) after searching on the Kafka website (Google didn't help them).


As an ASF member and PMC of several ASF projects, I know that this kind 
of page exist so this is why I made this proposal for the Kafka project 
because I really think that it can help users.


As you suggest, I can submit a PR to be added on the "powered by" page.

Thanks,

François

On 11/01/2024 21:00, Justine Olshan wrote:

Hey François,

My point was that the companies on that page use kafka as part of their
business. If you use Kafka as part of your business feel free to submit a
PR to be added.

I second Chris's point that other projects are not enough to require 
Kafka

having such a support page.

Justine

On Thu, Jan 11, 2024 at 11:57 AM Chris Egerton 
wrote:


Hi François,

Is it an official policy of the ASF that projects provide a listing of
commercial support options for themselves? I understand that other 
projects

have chosen to provide one, but this doesn't necessarily imply that all
projects should do the same, and I can't say I find this point very
convincing as a rebuttal to some of the good-faith concerns raised by 
the

PMC and members of the community so far. However, if there's an official
ASF stance on this topic, then I acknowledge that Apache Kafka should 
align

with it.

Best,

Chris


On Thu, Jan 11, 2024, 14:50 fpapon  wrote:


Hi Justine,

I'm not sure to see the difference between "happy users" and vendors
that advertise their products in some of the company list in the
"powered by" page.

Btw, my initial purpose of my proposal was to help user to find support
for production stuff rather than searching in google.

I don't think this is a bad thing because this is something that 
already

exist in many ASF projects like:

https://hop.apache.org/community/commercial/
https://struts.apache.org/commercial-support.html
https://directory.apache.org/commercial-support.html
https://tomee.apache.org/commercial-support.html
https://plc4x.apache.org/users/commercial-support.html
https://camel.apache.org/community/support/
https://openmeetings.apache.org/commercial-support.html
https://guacamole.apache.org/support/



https://cwiki.apache.org/confluence/display/HADOOP2/Distributions+and+Commercial+Support
https://activemq.apache.org/supporthttps://karaf.apache.org/community.html

https://netbeans.apache.org/front/main/help/commercial-support/
https://royale.apache.org/royale-commercial-support/

https://karaf.apache.org/community.html

As I understand for now, the channel for users to find production
support is:

- The mailing list (u...@kafka.apache.org / d...@kafka.apache.org)

- The official #kafka  ASF Slack channel (may be we can add it on the
website because I didn't find it in the website =>
https://kafka.apache.org/contact)

- Search in google for commercial support only

I can update my PR to mention only the 3 points above for the "get
support" page if people think that having a support page make sense.

regards,

François

On 11/01/2024 19:34, Justine Olshan wrote:

I think there is a difference between the "Powered by" page and a page

for

vendors to advertise their products and services.

The idea is that the companies on that page are "powered by" Kafka.

They

serve as examples of happy users of Kafka.
I don't think it is meant only as a place just for those companies to
advertise.

I'm a little confused by

In this case, I'm ok to say that the commercial support section in 
the

"Get support" is no need as we can use this page.

If you plan to submit for this page, please include a description on

how

your company uses 

Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Matthias J. Sax
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying you 
 need to use `KTable#groupBy()` (data needs to be repartitioned if you 
change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?





Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-10-02 Thread Matthias J. Sax
I did mean client side...  If KS goes into ERROR state, it should log 
the reason.


If the logs are indeed empty, try to register an 
uncaught-exception-handler via


KafkaStreamssetUncaughtExceptionHandler(...)


-Matthias

On 10/2/23 12:11 PM, Debraj Manna wrote:

Are you suggesting to check the Kafka broker logs? I do not see any other
errors logs on the client / application side.

On Fri, 29 Sep, 2023, 22:01 Matthias J. Sax,  wrote:


In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state
to begin with? Maybe you need to increase/change some timeouts/retries
configs.

The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
  at


org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)

  at


org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)

  at


org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)


Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?







Re: Kafka Streams reaching ERROR state during rolling upgrade / restart of brokers

2023-09-29 Thread Matthias J. Sax

In general, Kafka Streams should keep running.

Can you inspect the logs to figure out why it's going into ERROR state 
to begin with? Maybe you need to increase/change some timeouts/retries 
configs.


The stack trace you shared, is a symptom, but not the root cause.

-Matthias

On 9/21/23 12:56 AM, Debraj Manna wrote:

I am using Kafka broker 2.8.1 (from AWS MSK) with Kafka clients and Kafka
stream 3.5.1.

I am observing that whenever some rolling upgrade is done on AWS MSK our
stream application reaches an error state. I get the below exception on
trying to query the state store

caused by: java.lang.IllegalStateException: KafkaStreams is not running.
State is ERROR.
 at
org.apache.kafka.streams.KafkaStreams.validateIsRunningOrRebalancing(KafkaStreams.java:381)
 at
org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1663)
 at
org.springframework.cloud.stream.binder.kafka.streams.InteractiveQueryService.lambda$getHostInfo$2(InteractiveQueryService.java:227)

Can someone let me know what the recommended way we can keep the stream
application running whenever some rolling upgrade/restart of brokers is
done in the background?



Re: Can a message avoid loss occur in Kafka

2023-09-29 Thread Matthias J. Sax
For the config you provide, data loss should not happen (as long as you 
don't allow for unclean leader election, which is disabled by default).


But you might be subject to unavailability for some partitions if a 
broker fails.



-Matthias

On 9/17/23 7:49 AM, 陈近南 wrote:

Hello,
Can a message avoid loss occur in Kafka. For example, my config is:


Producer
retries = Integer.MAX_VALUE
request.required.acks=-1


Broker
replication.factor >= 2
min.insync.replicas > 1
log.flush.interval.messages=1


Consumer
enable.auto.commit = false

  Can it avoid loss message occur in Kafka, if can not,  why? and does exist other MQ can do avoid?



Best regards,
Chen



Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-09-05 Thread Matthias J. Sax

Great!

On 9/5/23 1:23 AM, Pushkar Deole wrote:

I think I could figure out a way. There are certain commands that can be
executed from kafka-cli to disassociate a consumer group from the topic
that are not more being consumed.
With this sort of command, I could delete the consumer offsets for a
consumer group for a specific topic and that resolved the lag problem:

kafka-consumer-groups --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS
--command-config ~/kafka.properties --delete-offsets --group
"" --topic " wrote:


As long as the consumer group is active, nothing will be deleted. That
is the reason why you get those incorrect alerts -- Kafka cannot know
that you stopped consuming from those topics. (That is what I tried to
explain -- seems I did a bad job...)

Changing the group.id is tricky because Kafka Streams uses it to
identify internal topic names (for repartiton and chagnelog topics), and
thus your app would start with newly created (and thus empty topics). --
You might want to restart the app with `auto.offset.reset = "earliest"`
and reprocess all available input to re-create state.


-Matthias

On 8/19/23 8:07 AM, Pushkar Deole wrote:

@matthias

what are the alternatives to get rid of this issue? When the lag starts
increasing, we have alerts configured on our monitoring system in Datadog
which starts sending alerts and alarms to reliability teams. I know in
kafka the inactive consumer group is cleared up after 7 days however not
sure if that is the case with topics that were consumed previously and

not

consumed now.

Does creation of new consumer group (setting a different application.id)

on

streams application an option here?


On Thu, Aug 17, 2023 at 7:03 AM Matthias J. Sax 

wrote:



Well, it's kinda expected behavior. It's a split brain problem.

In the end, you use the same `application.id / group.id` and thus the
committed offsets for the removed topics are still in
`__consumer_offsets` topics and associated with the consumer group.

If a tool inspects lags and compares the latest committed offsets to
end-offsets it looks for everything it finds in the `__consumer_offsets`
topics for the group in question -- the tool cannot know that you
changed the application and that is does not read from those topics any
longer (and thus does not commit any longer).

I am not sure from top of my head if you could do a manual cleanup for
the `application.id` and topics in question and delete the committed
offsets from the `__consumer_offsets` topic -- try to checkout `Admin`
client and/or the command line tools...

In know that it's possible to delete committed offsets for a consumer
group (if a group becomes inactive, the broker would also cleanup all
group metadata after a configurable timeout), but I am not sure if
that's for the entire consumer group (ie, all topic) or if you can do it
on a per-topic basis, too.


HTH,
 -Matthias


On 8/16/23 2:11 AM, Pushkar Deole wrote:

Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein

the

input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole 

wrote:



Hi All,

I have a streams application with 3 instances with application-id set

to

applicationV1. The application uses processor API with reading from

source

topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to
process data any more from 2 of those topics so we removed 2 topics

from

the source topics list. We have configured Datadog dashboard to report

and

alert on consumer lag so after removing the 2 source topics and

deploying

application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from

kafka-cli,

we could see that the consumer group is reporting lag against the

topics

removed from source topic list which is reflecting as increasing lag

on

Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this

is

not expected since streams application no more has those topics as

part

of

source, it should not report lag on those.













Re: AW: Table updates are not consistent when doing a join with a Stream

2023-09-04 Thread Matthias J. Sax
Your update to the KTable is async when you send data back to the KTable 
input topic. So your program is subject to race-conditions.


So switching to the PAPI was the right move: it make the update to the 
state store sync and thus fixes the issue.



-Matthias

On 9/4/23 5:53 AM, Mauricio Lopez wrote:

Hello,

They were getting processed by the same consumer as we only had a single 
machine running this.
What we ended up doing is basically drawing the same topology but interacting 
directly with the stateStore using the Processor API instead of DSL. Seems that 
fixed everything up (and made it way quicker).

Best,
Mauricio

On 2023/08/28 12:04:12 Claudia Kesslau wrote:

Hi,

I'm definitly no expert, but to me it sounds as not all your messages are 
getting processed by the same consumer. Are you using the key `foo` for 
partitioning? Is `baz` actually another key or is this mixup in your example 
and `baz` is another value with key `foo`?

Hope you find a solution to your problem.

Best,
Claudia

Von: Mauricio Lopez mailto:ml...@silversky.com>>
Gesendet: Donnerstag, 17. August 2023 22:57
An: users@kafka.apache.org 
mailto:us...@kafka.apache.org>>
Betreff: Table updates are not consistent when doing a join with a Stream

Hello Folks,

We are having an issue with a Kafka Streams Java application.
We have a KStream and a KTable which are joined using a Left Join. The entries 
in the KTable are constantly updated by the new information that comes from the 
KStream. Each KStream message is adding entries to an array that the KTable has 
for each key. This update gets sent back to the KTable topic, expanding this 
array every time a new message comes from the KStream.

As an example, what should be happening (and what happens in our unit tests) is:


   *   KTable has an empty array for key “foo”: []
   *   Event 1 comes with key “foo” and value “bar”
   *   Ktable gets updated to “foo”: [“bar”] , sending this update´to´ the same 
topic that the KTable is plugged into.
   *   Event 2 comes with key “baz”
   *   Update is pulled to mem by Ktable, and the Ktable gets updated to “foo”: 
[“bar, “baz”], sending this change ´to´ the same topic that the KTable is 
plugged into. Baz was appended to the array for key “foo”.

But what is happening is the following:


   *   KTable has an empty array for key “foo”: []

   *   Event 1 comes with key “foo” and value “bar”
   *   Ktable gets updated to “foo”: [“bar”] in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to.
   *   Event 2 comes with key “baz”
   *   Ktable gets updated to “foo”: [“baz”]  in the joiner, sending an event 
´to´ the same topic that the KTable is plugged to afterwards.

This happens multiple times, and after a couple of seconds, one of the incoming 
messages is finally appended, but many of them are lost. As you can see, we 
suspect that when the Event 2 is received, the KTable has somehow not received  
the first update for adding “baz” to the array.
This means that many events are missed, and we cannot successfully get the 
KTable to save all the data for all the events. In turn, it sometimes 
overwrites the updates from some events.

So far, we have tried:


   *   Setting STATESTORE_CACHE_MAX_BYTES_CONFIG to 0, to attempt to force the 
app not to cache any changes and send to the output topic instantly.
   *   Setting COMMIT_INTERVAL_MS_CONFIG to 0, to attempt to force the app to 
send all updates instantly
   *   Setting TOPOLOGY_OPTIMIZATION_CONFIG to “reuse.ktable.source.topics” and 
“all” in case there is some optimization pattern that could help us.


None of these have allowed us to have a fully consistent update of the KTable 
each time a new event comes. It always gets overwritten or misses incoming 
updates made by events.  Can someone advice if there’s a way to make the KTable 
get successfully updated by each one of the events, as the first example shows?

Thanks,

Mauricio L



This message is for the sole use of the intended recipient(s) and may contain 
confidential and/or privileged information of SilverSky. Any unauthorized 
review, use, copying, disclosure, or distribution is prohibited. If you are not 
the intended recipient, please immediately contact the sender by reply email 
and delete all copies of the original message.



Mauricio López S.
Software Engineer



Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-09-04 Thread Matthias J. Sax
As long as the consumer group is active, nothing will be deleted. That 
is the reason why you get those incorrect alerts -- Kafka cannot know 
that you stopped consuming from those topics. (That is what I tried to 
explain -- seems I did a bad job...)


Changing the group.id is tricky because Kafka Streams uses it to 
identify internal topic names (for repartiton and chagnelog topics), and 
thus your app would start with newly created (and thus empty topics). -- 
You might want to restart the app with `auto.offset.reset = "earliest"` 
and reprocess all available input to re-create state.



-Matthias

On 8/19/23 8:07 AM, Pushkar Deole wrote:

@matthias

what are the alternatives to get rid of this issue? When the lag starts
increasing, we have alerts configured on our monitoring system in Datadog
which starts sending alerts and alarms to reliability teams. I know in
kafka the inactive consumer group is cleared up after 7 days however not
sure if that is the case with topics that were consumed previously and not
consumed now.

Does creation of new consumer group (setting a different application.id) on
streams application an option here?


On Thu, Aug 17, 2023 at 7:03 AM Matthias J. Sax  wrote:


Well, it's kinda expected behavior. It's a split brain problem.

In the end, you use the same `application.id / group.id` and thus the
committed offsets for the removed topics are still in
`__consumer_offsets` topics and associated with the consumer group.

If a tool inspects lags and compares the latest committed offsets to
end-offsets it looks for everything it finds in the `__consumer_offsets`
topics for the group in question -- the tool cannot know that you
changed the application and that is does not read from those topics any
longer (and thus does not commit any longer).

I am not sure from top of my head if you could do a manual cleanup for
the `application.id` and topics in question and delete the committed
offsets from the `__consumer_offsets` topic -- try to checkout `Admin`
client and/or the command line tools...

In know that it's possible to delete committed offsets for a consumer
group (if a group becomes inactive, the broker would also cleanup all
group metadata after a configurable timeout), but I am not sure if
that's for the entire consumer group (ie, all topic) or if you can do it
on a per-topic basis, too.


HTH,
-Matthias


On 8/16/23 2:11 AM, Pushkar Deole wrote:

Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein

the

input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole 

wrote:



Hi All,

I have a streams application with 3 instances with application-id set to
applicationV1. The application uses processor API with reading from

source

topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to
process data any more from 2 of those topics so we removed 2 topics from
the source topics list. We have configured Datadog dashboard to report

and

alert on consumer lag so after removing the 2 source topics and

deploying

application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from

kafka-cli,

we could see that the consumer group is reporting lag against the topics
removed from source topic list which is reflecting as increasing lag on
Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this is
not expected since streams application no more has those topics as part

of

source, it should not report lag on those.









Re: kafka streams consumer group reporting lag even on source topics removed from topology

2023-08-16 Thread Matthias J. Sax

Well, it's kinda expected behavior. It's a split brain problem.

In the end, you use the same `application.id / group.id` and thus the 
committed offsets for the removed topics are still in 
`__consumer_offsets` topics and associated with the consumer group.


If a tool inspects lags and compares the latest committed offsets to 
end-offsets it looks for everything it finds in the `__consumer_offsets` 
topics for the group in question -- the tool cannot know that you 
changed the application and that is does not read from those topics any 
longer (and thus does not commit any longer).


I am not sure from top of my head if you could do a manual cleanup for 
the `application.id` and topics in question and delete the committed 
offsets from the `__consumer_offsets` topic -- try to checkout `Admin` 
client and/or the command line tools...


In know that it's possible to delete committed offsets for a consumer 
group (if a group becomes inactive, the broker would also cleanup all 
group metadata after a configurable timeout), but I am not sure if 
that's for the entire consumer group (ie, all topic) or if you can do it 
on a per-topic basis, too.



HTH,
  -Matthias


On 8/16/23 2:11 AM, Pushkar Deole wrote:

Hi streams Dev community  @matthias, @bruno

Any inputs on above issue? Is this a bug in the streams library wherein the
input topic removed from streams processor topology, the underlying
consumer group still reporting lag against those?

On Wed, Aug 9, 2023 at 4:38 PM Pushkar Deole  wrote:


Hi All,

I have a streams application with 3 instances with application-id set to
applicationV1. The application uses processor API with reading from source
topics, processing the data and writing to destination topic.
Currently it consumes from 6 source topics however we don't need to
process data any more from 2 of those topics so we removed 2 topics from
the source topics list. We have configured Datadog dashboard to report and
alert on consumer lag so after removing the 2 source topics and deploying
application, we started getting several alerts about consumer lag on
applicationV1 consumer group which is underlying consumer group of the
streams application. When we looked at the consumer group from kafka-cli,
we could see that the consumer group is reporting lag against the topics
removed from source topic list which is reflecting as increasing lag on
Datadog monitoring.

Can someone advise if this is expected behavior? In my opinion, this is
not expected since streams application no more has those topics as part of
source, it should not report lag on those.





Re: Consuming an entire partition with control messages

2023-07-27 Thread Matthias J. Sax
Well, `kafka-consumer-group.sh` can only display the difference between 
"committed offset" and "end offset". It cannot know what the "right" 
offset to be committed is. It's really the responsibility of the 
consumers to commit correctly.


-Matthias

On 7/27/23 1:03 AM, Vincent Maurin wrote:
Thank you Matthias for your answer, I open an issue on the aiokafka 
project as follow up, let's see how we can resolve it there 
https://github.com/aio-libs/aiokafka/issues/911


As mentioned in the issue, some tools like kafka-consumer-groups.sh also 
display a lag of "1" in this kind of situation


Best regards,

Vincent

On 13/06/2023 17:27, Matthias J. Sax wrote:

Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer 
should step over it, and report the correct position after the marker.


The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:

Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent


Re: [ANNOUNCE] New committer: Greg Harris

2023-07-10 Thread Matthias J. Sax

Congrats!

On 7/10/23 8:45 AM, Chris Egerton wrote:

Hi all,

The PMC for Apache Kafka has invited Greg Harris to become a committer, and
we are happy to announce that he has accepted!

Greg has been contributing to Kafka since 2019. He has made over 50 commits
mostly around Kafka Connect and Mirror Maker 2. His most notable
contributions include KIP-898: "Modernize Connect plugin discovery" and a
deep overhaul of the offset syncing logic in MM2 that addressed several
technically-difficult, long-standing, high-impact issues.

He has also been an active participant in discussions and reviews on the
mailing lists and on GitHub.

Thanks for all of your contributions, Greg. Congratulations!



Re: Kafka Streaming: RocksDbSessionBytesStoreSupplier seems lost data in Kubernetes

2023-06-29 Thread Matthias J. Sax
The class `RocksDbSessionBytesStoreSupplier` is in package `internal` 
and thus, you should not use it directly. Instead, you should use the 
public factory class `org.apache.kafka.streams.state.Stores`


However, your usage seems correct in general.

Not sure why you pass-in the supplier directly though? In the end, if 
you want to set a name for the store, you can use 
`Materialized.as("..."), and you can set retention time via 
`Materailazed#withRetention(...)` (what would be the proper usage of the 
API).


Besides this, the store should be backed by a changelog topic and thus 
you should never lose any data, independent of you deployment.


Of course, I would recommend to use a stateful set and re-attach storage 
to the pod to avoid re-creating the store from the changelog.


HTH,

-Matthias


On 6/28/23 8:49 AM, An, Hongguo (CORP) wrote:

Hi:
I am using RocksDbSessionBytesStoreSupplier in my kafka streaming application 
for an aggregation like this:


var materialized =

Materialized.>as(

  new 
RocksDbSessionBytesStoreSupplier(env.getProperty("messages.cdc.pft.topic", 
"NASHCM.PAYROLL.PFT.FILENUMBER"),

Duration.parse(env.getProperty("pft.duration", 
"P7D")).toMillis()))

.withKeySerde(stringSerde)

.withValueSerde(listSerde);




stream.windowedBy(SessionWindows

.with(Duration.parse(env.getProperty("pft.gap", "PT0.1S")))

.grace(Duration.parse(env.getProperty("pft.duration", 
"PT0.05S")))

   )

   .aggregate(ArrayList::new,

(k, v, list)->{list.add(v); return list;},

(k, list1, list2)->{list1.addAll(list2); return list1;},

materialized)

.toStream().foreach((key, value) -> {

//sometimes value is null, but this should never happened – and we do see some 
messages not processed.

}



The application runs on Kubernetes, should we not use 
RocksDbSessionBytesStoreSupplier?



Thanks

Andrew



This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


Re: Consuming an entire partition with control messages

2023-06-13 Thread Matthias J. Sax

Sounds like a bug in aiokafka library to me.

If the last message in a topic partition is a tx-marker, the consumer 
should step over it, and report the correct position after the marker.


The official KafkaConsumer (ie, the Java one), does the exact same thing.


-Matthias

On 5/30/23 8:41 AM, Vincent Maurin wrote:

Hello !

I am working on an exactly once stream processors in Python, using
aiokafka client library. My program stores a state in memory, that is
recovered from a changelog topic, like in kafka streams.

On each processing loop, I am consuming messages, producing messages
to an output topics and to my changelog topic, within a transaction.

When I need to restart a runner, to restore the state in memory, I
have a routine consuming the changelog topic from the beginning to the
"end" with a read_commited isolation level. Here I am struggling to
define when to stop my recovery :
* my current (maybe) working solution is to loop over "poll" until
poll is not returning any messages anymore
* I tried to do more something based on the end offests, the checking
the consumer position, but with control messages at the end of the
partition, I am running into an issue where position is one below end
offsets, and doesn't go further

I had a quick look to
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
but it is a bit hard to figure out what is going on here

Best regards,
Vincent


Re: [VOTE] 3.4.1 RC0

2023-05-22 Thread Matthias J. Sax

Thanks a lot!

-Matthias

On 5/21/23 7:27 PM, Luke Chen wrote:

Hi Matthias,

Yes, I agree we should get this hotfix into 3.4.1.
I've backported into the 3.4 branch.
I'll create a new RC for 3.4.1.

Thanks.
Luke

On Mon, May 22, 2023 at 5:13 AM Matthias J. Sax  wrote:


Hi Luke,

RC0 for 3.4.1 includes a fix for
https://issues.apache.org/jira/browse/KAFKA-14862. We recently
discovered that tge fix itself introduces a regression. We have already
a PR to fix-forward the regression:
https://github.com/apache/kafka/pull/13734

I think we should get the open PR merged, and back part not only to 3.5,
but also to 3.4.1, and get a new RC for 3.4.1.

Thoughts?


-Matthias


On 5/19/23 6:12 AM, Josep Prat wrote:

Hi Luke,
This gets a +1 from my end. I believe non-binding because if I understand
it correctly, binding votes for releases are only issued by PMCs (


https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses

).

I did the following validations:
- Verified signatures and checksums for all the generated artifacts
- Built from source with Java 11 and Scala 2.13.10
- Run unit tests
- Run integration tests
- Run the quickstart with Zookeeper and KRaft

Best,

On Wed, May 17, 2023 at 2:11 PM Josep Prat  wrote:


Hi Luke,

I ran the tests from the source package you created and I didn't get any
of the test failures you had on your CI build. I got other flaky tests
though, that after being run in isolation ran successfully. I'll try to

run

signature validation, and some further testing later today or later this
week.

Best,

On Wed, May 17, 2023 at 12:43 PM Federico Valeri 
wrote:


Hi Luke, thanks for running the release.

Looks like the Maven artifacts are not in staging:



https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/3.4.1/


Documentation still has 3.4.0, instead of 3.4.1 (not sure if this will
be aligned later):
https://kafka.apache.org/34/documentation.html#producerapi

Br
Fede


On Wed, May 17, 2023 at 5:24 AM Luke Chen  wrote:


Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.4.1.

This is a bugfix release with several fixes since the release of

3.4.0.

A

few of the major issues include:
- core
KAFKA-14644 <https://issues.apache.org/jira/browse/KAFKA-14644>

Process

should stop after failure in raft IO thread
KAFKA-14946 <https://issues.apache.org/jira/browse/KAFKA-14946> KRaft
controller node shutting down while renouncing leadership
KAFKA-14887 <https://issues.apache.org/jira/browse/KAFKA-14887> ZK

session

timeout can cause broker to shutdown
- client
KAFKA-14639 <https://issues.apache.org/jira/browse/KAFKA-14639> Kafka
CooperativeStickyAssignor revokes/assigns partition in one rebalance

cycle

- connect
KAFKA-12558 <https://issues.apache.org/jira/browse/KAFKA-12558> MM2

may not

sync partition offsets correctly
KAFKA-14666 <https://issues.apache.org/jira/browse/KAFKA-14666> MM2

should

translate consumer group offsets behind replication flow
- stream
KAFKA-14172 <https://issues.apache.org/jira/browse/KAFKA-14172> bug:

State

stores lose state when tasks are reassigned under EOS

Release notes for the 3.4.1 release:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by May 24, 2023
Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~showuon/kafka-3.4.1-rc0/

* Maven artifacts to be voted upon:


https://repository.apache.org/content/groups/staging/org/apache/kafka/


* Javadoc:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/javadoc/

* Tag to be voted upon (off 3.4 branch) is the 3.4.1 tag:
https://github.com/apache/kafka/releases/tag/3.4.1-rc0

* Documentation:
https://kafka.apache.org/34/documentation.html

* Protocol:
https://kafka.apache.org/34/protocol.html

The most recent build has had test failures. These all appear to be

due

to

flakiness, but it would be nice if someone more familiar with the

failed

tests could confirm this. I may update this thread with passing build

links

if I can get one, or start a new release vote thread if test failures

must

be addressed beyond re-running builds until they pass.

Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/133/

System tests:
Will update the results later

Thank you.
Luke





--
[image: Aiven] <https://www.aiven.io>

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io <https://www.aiven.io>   |
<https://www.facebook.com/aivencloud>
<https://www.linkedin.com/company/aiven/>   <

https://twitter.com/aiven_io>

*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B










Re: [VOTE] 3.4.1 RC0

2023-05-21 Thread Matthias J. Sax

Hi Luke,

RC0 for 3.4.1 includes a fix for 
https://issues.apache.org/jira/browse/KAFKA-14862. We recently 
discovered that tge fix itself introduces a regression. We have already 
a PR to fix-forward the regression: 
https://github.com/apache/kafka/pull/13734


I think we should get the open PR merged, and back part not only to 3.5, 
but also to 3.4.1, and get a new RC for 3.4.1.


Thoughts?


-Matthias


On 5/19/23 6:12 AM, Josep Prat wrote:

Hi Luke,
This gets a +1 from my end. I believe non-binding because if I understand
it correctly, binding votes for releases are only issued by PMCs (
https://cwiki.apache.org/confluence/display/KAFKA/Release+Process#ReleaseProcess-Afterthevotepasses
).

I did the following validations:
- Verified signatures and checksums for all the generated artifacts
- Built from source with Java 11 and Scala 2.13.10
- Run unit tests
- Run integration tests
- Run the quickstart with Zookeeper and KRaft

Best,

On Wed, May 17, 2023 at 2:11 PM Josep Prat  wrote:


Hi Luke,

I ran the tests from the source package you created and I didn't get any
of the test failures you had on your CI build. I got other flaky tests
though, that after being run in isolation ran successfully. I'll try to run
signature validation, and some further testing later today or later this
week.

Best,

On Wed, May 17, 2023 at 12:43 PM Federico Valeri 
wrote:


Hi Luke, thanks for running the release.

Looks like the Maven artifacts are not in staging:

https://repository.apache.org/content/groups/staging/org/apache/kafka/kafka-clients/3.4.1/

Documentation still has 3.4.0, instead of 3.4.1 (not sure if this will
be aligned later):
https://kafka.apache.org/34/documentation.html#producerapi

Br
Fede


On Wed, May 17, 2023 at 5:24 AM Luke Chen  wrote:


Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.4.1.

This is a bugfix release with several fixes since the release of 3.4.0.

A

few of the major issues include:
- core
KAFKA-14644  Process
should stop after failure in raft IO thread
KAFKA-14946  KRaft
controller node shutting down while renouncing leadership
KAFKA-14887  ZK

session

timeout can cause broker to shutdown
- client
KAFKA-14639  Kafka
CooperativeStickyAssignor revokes/assigns partition in one rebalance

cycle

- connect
KAFKA-12558  MM2

may not

sync partition offsets correctly
KAFKA-14666  MM2

should

translate consumer group offsets behind replication flow
- stream
KAFKA-14172  bug:

State

stores lose state when tasks are reassigned under EOS

Release notes for the 3.4.1 release:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by May 24, 2023
Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~showuon/kafka-3.4.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~showuon/kafka-3.4.1-rc0/javadoc/

* Tag to be voted upon (off 3.4 branch) is the 3.4.1 tag:
https://github.com/apache/kafka/releases/tag/3.4.1-rc0

* Documentation:
https://kafka.apache.org/34/documentation.html

* Protocol:
https://kafka.apache.org/34/protocol.html

The most recent build has had test failures. These all appear to be due

to

flakiness, but it would be nice if someone more familiar with the failed
tests could confirm this. I may update this thread with passing build

links

if I can get one, or start a new release vote thread if test failures

must

be addressed beyond re-running builds until they pass.

Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/133/

System tests:
Will update the results later

Thank you.
Luke





--
[image: Aiven] 

*Josep Prat*
Open Source Engineering Director, *Aiven*
josep.p...@aiven.io   |   +491715557497
aiven.io    |

   
*Aiven Deutschland GmbH*
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B






Re: Some questions on Kafka on order of messages with mutiple partitions

2023-05-12 Thread Matthias J. Sax

Does having  9 partitions with 9 replication factors make sense here?


A replication factor of 9 sounds very high. For production, replication 
factor of 3 is recommended.


How many partitions you want/need is a different question, and cannot be 
answered in a general way.



"Yes" to all other questions.


-Matthias



On 5/12/23 9:50 AM, Mich Talebzadeh wrote:

Hi,

I have used Apache Kafka in conjunction with Spark as a messaging 
source. This rather dated diagram describes it


I have two physical hosts each 64 GB, running RHES 7.6, these are called 
rhes75 and rhes76 respectively. The Zookeeper version is 3.7.1 and kafka 
version is 3.4.0



image.png
I have a topic md -> MarketData that has been defined as below

kafka-topics.sh --create --bootstrap-server 
rhes75:9092,rhes75:9093,rhes75:9094,rhes76:9092,rhes76:9093,rhes76:9094,rhes76:9095,rhes76:9096, rhes76:9097 --replication-factor 9 --partitions 9 --topic md


kafka-topics.sh --describe --bootstrap-server 
rhes75:9092,rhes75:9093,rhes75:9094,rhes76:9092,rhes76:9093,rhes76:9094,rhes76:9095,rhes76:9096, rhes76:9097 --topic md



This is working fine

Topic: md       TopicId: UfQly87bQPCbVKoH-PQheg PartitionCount: 9   
ReplicationFactor: 9    Configs: segment.bytes=1073741824
         Topic: md       Partition: 0    Leader: 12      Replicas: 
12,10,8,2,9,11,1,7,3  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 1    Leader: 9       Replicas: 
9,8,2,12,11,1,7,3,10  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 2    Leader: 11      Replicas: 
11,2,12,9,1,7,3,10,8  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 3    Leader: 1       Replicas: 
1,12,9,11,7,3,10,8,2  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 4    Leader: 7       Replicas: 
7,9,11,1,3,10,8,2,12  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 5    Leader: 3       Replicas: 
3,11,1,7,10,8,2,12,9  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 6    Leader: 10      Replicas: 
10,1,7,3,8,2,12,9,11  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 7    Leader: 8       Replicas: 
8,7,3,10,2,12,9,11,1  Isr: 10,1,9,2,12,7,3,11,8
         Topic: md       Partition: 8    Leader: 2       Replicas: 
2,3,10,8,12,9,11,1,7  Isr: 10,1,9,2,12,7,3,11,8


However, I have a number of questions

 1. Does having  9 partitions with 9 replication factors make sense here?
 2. As I understand the parallelism is equal to the number of partitions
for a topic.
 3. Kafka only provides a total order over messages *within a
partition*, not between different partitions in a topic and in
this case I have one topic
 4.

Data within a Partition will be stored in the order in which it is
written, therefore, data read from a Partition will be read in order
for that partition?

 5.

Finally if I want to get messages in order across multiple all 9
partitionss, then I need to group messages with a key, so that
messages with the samekey goto the samepartition and withinthat
partition the messages are ordered

Thanks


*Disclaimer:* Use it at your own risk.Any and all responsibility for any 
loss, damage or destruction of data or any other property which may 
arise from relying on this email's technical content is explicitly 
disclaimed. The author will in no case be liable for any monetary 
damages arising from such loss, damage or destruction.




Re: [DISCUSS] Re-visit end of life policy

2023-04-25 Thread Matthias J. Sax

Adding the user-mailing list. Seems relevant to everybody.

On 4/20/23 2:45 AM, Divij Vaidya wrote:

Thank you Matthias for your comments.

I agree with you that the decision should be driven based on strong
community ask as it introduces a significant overhead on the maintainers. I
was hoping that more folks (users of Kafka) would contribute to this thread
with their opinion but perhaps, I need to find alternative ways to get data
about Kafka version usage in the wild. Given the effort of migrating major
versions (2.x to 3.x), I am actually surprised that we don't hear more
often from the users about the community's 12 month EOL policy.

I will get back on this thread once I have more data to support the
proposal.

--
Divij Vaidya



On Thu, Apr 20, 2023 at 3:52 AM Matthias J. Sax  wrote:


While I understand the desire, I tend to agree with Ismael.

In general, it's a significant amount of work not just to do the actual
releases, but also the cherry-pick bug-fixed to older branches. Code
diverges very quickly, and a clean cherry-pick is usually only possible
for one or two branches. And it's not just simple conflicts that are
easy to resolve, but it often even implies to do a full new fix, if the
corresponding code was refactored, what is more often the case than one
might think.

If there is no very strong ask from the community, I would rather let
committer spent their time reviewing PRs instead and help contributors
to get the work merged.

Just my 2ct.

-Matthias


On 4/13/23 2:52 PM, Ismael Juma wrote:

Clarification below.

I did not understand your point about maintenance expense to ensure

compatibility. I am confused because, IMO, irrespective of our bug fix
support duration for minor versions, we should ensure that all prior

minor

versions are compatible. Hence, increasing the support duration to 24
months will not add more expense than today to ensure compatibility.



No, I am not saying that. I am saying that there is no reason not to
upgrade from one minor release to another since we provide full
compatibility between minor releases. The expensive part is that we

release

3 times a year, so you have to support 6 releases at any given point in
time. More importantly, you have to validate all these releases, handle

any

additional bugs and so on. When it comes to the CVE stuff, you also have

to

deal with cases where a project you depend on forces an upgrade to a
release with compatibility impact and so on. Having seen this first hand,
it's a significant amount of work.

Ismael







Re: [ANNOUNCE] New PMC chair: Mickael Maison

2023-04-21 Thread Matthias J. Sax

Congrats Mickael!

And thanks a lot for taking on this additional task! Glad to have you!


-Matthias

On 4/21/23 9:40 AM, Viktor Somogyi-Vass wrote:

Jun, thank you for all your hard work! Also, congrats Mickael, it is very
well deserved :)

Best,
Viktor

On Fri, Apr 21, 2023, 18:15 Adam Bellemare  wrote:


Thank you for all your hard work Jun - that's a decade-long legacy!
And congratulations to you Mickael!

On Fri, Apr 21, 2023 at 11:20 AM Josep Prat 
wrote:


Thanks Jun for your work as Chair all these years!
Congratulations Mickael!

Best,

———
Josep Prat

Aiven Deutschland GmbH

Alexanderufer 3-7, 10117 Berlin

Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen

m: +491715557497

w: aiven.io

e: josep.p...@aiven.io

On Fri, Apr 21, 2023, 17:10 Jun Rao  wrote:


Hi, everyone,

After more than 10 years, I am stepping down as the PMC chair of Apache
Kafka. We now have a new chair Mickael Maison, who has been a PMC

member

since 2020. I plan to continue to contribute to Apache Kafka myself.

Congratulations, Mickael!

Jun









Re: can Kafka streams support ordering across 2 different topics when consuming from multiple source topics?

2023-03-21 Thread Matthias J. Sax
In general there is no ordering guarantee between topics. So it might 
depend a lot ofnthe details of your use case.


For example, if you know that it will be always two event, you could 
buffer the first one in a state-store, and wait for the second one to 
arrive and decide in which order to forward both events downstream for 
actual processing.



HTH, Matthias


On 3/20/23 11:57 PM, Pushkar Deole wrote:

Hi All,

We have a kafka streams application that consumes from 2 different topics
say topic A and topic B. The application uses data of telephone call on
those topics and each call has a call id which is used as key to send
events to those 2 topics. e.g. for a telephone call, the 1st event related
to that call is sent to A with call id however subsequent event for that
same call might go to topic B again with call id as key.

*At times, we need to process those 2 events in an order, which is not
possible with the current topology that we are using*. *Can someone suggest
if this is possible to achieve with streams?*
The topology is as below:

Topic A has 6 partitions
Topics B has 6 partitions
Call id used as key on both topics
Kafka streams application has 3 instances that consumes from both of the
topics as source topics.
Each streams application instance has 2 stream threads thus total 6 stream
threads across 3 instances of streams application cater to 6 partitions of
inputs topics.



Re: [ANNOUNCE] New Kafka PMC Member: Chris Egerton

2023-03-09 Thread Matthias J. Sax

Congrats!

On 3/9/23 2:59 PM, José Armando García Sancio wrote:

Congrats Chris.

On Thu, Mar 9, 2023 at 2:01 PM Kowshik Prakasam  wrote:


Congrats Chris!

On Thu, Mar 9, 2023 at 1:33 PM Divij Vaidya  wrote:


Congratulations Chris! I am in awe with the amount of effort you put in
code reviews and helping out the community members. Very well deserved.

--
Divij Vaidya



On Thu, Mar 9, 2023 at 9:49 PM Jorge Esteban Quilcate Otoya <
quilcate.jo...@gmail.com> wrote:


So well deserved! Congratulations Chris!!!

On Thu, 9 Mar 2023 at 22:09, Lucas Brutschy 
wrote:


Congratulations!

On Thu, Mar 9, 2023 at 8:48 PM Roman Schmitz 
wrote:


Congratulations Chris!

Am Do., 9. März 2023 um 20:33 Uhr schrieb Chia-Ping Tsai <

chia7...@gmail.com

:



Congratulations Chris!


Mickael Maison  於 2023年3月10日 上午2:21

寫道:


Congratulations Chris!


On Thu, Mar 9, 2023 at 7:17 PM Bill Bejeck 

wrote:


Congratulations Chris!


On Thu, Mar 9, 2023 at 1:12 PM Jun Rao




wrote:


Hi, Everyone,

Chris Egerton has been a Kafka committer since July 2022. He

has

been

very

instrumental to the community since becoming a committer. It's

my

pleasure

to announce that Chris is now a member of Kafka PMC.

Congratulations Chris!

Jun
on behalf of Apache Kafka PMC















Re: [ANNOUNCE] New Kafka PMC Member: David Arthur

2023-03-09 Thread Matthias J. Sax

Congrats!

On 3/9/23 2:59 PM, José Armando García Sancio wrote:

Congrats David!

On Thu, Mar 9, 2023 at 2:00 PM Kowshik Prakasam  wrote:


Congrats David!

On Thu, Mar 9, 2023 at 12:09 PM Lucas Brutschy
 wrote:


Congratulations!

On Thu, Mar 9, 2023 at 8:37 PM Manikumar 
wrote:


Congrats David!


On Fri, Mar 10, 2023 at 12:24 AM Josep Prat 

Congrats David!

———
Josep Prat

Aiven Deutschland GmbH

Alexanderufer 3-7, 10117 Berlin

Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen

m: +491715557497

w: aiven.io

e: josep.p...@aiven.io

On Thu, Mar 9, 2023, 19:22 Mickael Maison 

wrote:



Congratulations David!

On Thu, Mar 9, 2023 at 7:20 PM Chris Egerton 


wrote:


Congrats David!

On Thu, Mar 9, 2023 at 1:17 PM Bill Bejeck 

wrote:



Congratulations David!

On Thu, Mar 9, 2023 at 1:12 PM Jun Rao 


wrote:



Hi, Everyone,

David Arthur has been a Kafka committer since 2013. He has been

very

instrumental to the community since becoming a committer. It's

my

pleasure

to announce that David is now a member of Kafka PMC.

Congratulations David!

Jun
on behalf of Apache Kafka PMC













Re: Kafka Streams 2.7.1 to 3.3.1 rolling upgrade

2023-02-27 Thread Matthias J. Sax

Hmmm... that's interesting...

It seems that Kafka Streams "version probing" does not play well static 
group membership...


Sounds like a "bug" to me -- well, more like a missing integration. Not 
sure right now, if/how we could fix it.


Can you file a ticket?

For now, I don't think you can do anything about it. Sorry. :(


-Matthias



On 2/27/23 6:50 AM, Vinoth Rengarajan wrote:

Hi Team,

I am trying to upgrade my Kaka Streams application from 2.7.1 to 3.3.1.
Brokers are running on Kafka 2.7.1. The plan is to upgrade the clients
first and then then brokers

I have already enabled the static membership in our application so that we
I am not expecting a rebalance. Below are the configs *(Stream Config &
Consumer Config)*.

As mentioned earlier, the application is running on Kafka 2.7.1. I deployed
the latest version of the app with 3.3.1 streams libraries, and configured
the '*upgrade.from' *property to 2.7 (based on the upgrade documentation
available here
https://kafka.apache.org/33/documentation/streams/upgrade-guide). When I
do a rolling bounce with the latest changes, I can see a rebalance being
triggered on other instances in the cluster.

I can see the below logs on the instance which is being bounced, forcing a
rebalance on others. Am I missing something? How can I avoid other
instances in the cluster from rebalancing?


*Logs:*
INFO  2023-02-27 09:52:16.805 | streams.KafkaStreams stream-client
[kafka_upgrade.Kafka_Upgrade_Test] State transition from CREATED to
REBALANCING
INFO  2023-02-27 09:52:16.946 | internals.ConsumerCoordinator [Consumer
instanceId=kafka_upgrade.Kafka_Upgrade_Test-4,
clientId=kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer,
groupId=kafka_upgrade.Kafka_Upgrade_Test] Notifying assignor about the new
Assignment(partitions=[kafka_upgrade.Kafka_Upgrade_Test-version-updates-11,
kafka_upgrade.Kafka_Upgrade_Test-version-updates-23], userDataSize=56)
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
Sent a version 11 subscription and got version 8 assignment back
(successful version probing). Downgrade subscription metadata to commonly
supported version 8 and trigger new rebalance.
INFO  2023-02-27 09:52:16.947 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-2-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-1-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-4-consumer]
Requested to schedule immediate rebalance due to version probing.
INFO  2023-02-27 09:52:16.948 | internals.StreamsPartitionAssignor
stream-thread [kafka_upgrade.Kafka_Upgrade_Test-StreamThread-3-consumer]
Requested to schedule immediate rebalance due to version probing.

*Streams Config:*

acceptable.recovery.lag = 1
application.id = Kafka_Upgrade_Test
application.server =
bootstrap.servers = [broker1, broker2, broker3]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id = kafka_upgrade.Kafka_Upgrade_Test
commit.interval.ms = 3
connections.max.idle.ms = 54
default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndFailExceptionHandler
default.dsl.store = rocksDB
default.key.serde = null
default.list.key.serde.inner = null
default.list.key.serde.type = null
default.list.value.serde.inner = null
default.list.value.serde.type = null
default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class
org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = null
max.task.idle.ms = 0

Re: Coralogix Logo on Powered By Page

2023-02-01 Thread Matthias J. Sax

Thanks for reaching out.

Can you open a PR against https://github.com/apache/kafka-site updating 
`powered-by.html`?



-Matthias

On 2/1/23 1:13 AM, Tali Soroker wrote:

Hi,
I am writing on behalf of Coralogix to request adding us to the Powered 
By page on the Apache Kafka website.


I am attaching our logo and here is a description of our usage for your 
consideration:


Coralogix uses Kafka Streams to power our observability platform
that ingests and analyzes up to tens of billions of messages per
day. Using Kafka Streams, we are able to decouple analytics from
indexing and storage to provide our users with the best performance,
scalability, and cost.


Best,
Tali


--





  Tali Soroker

Product Marketing

+972 058-681-1707

coralogix.com 



Re: Custom Kafka Streams State Restore Logic

2023-01-23 Thread Matthias J. Sax

Thanks.

I agree. Seems your options are limited. The API is not really a good 
fix for what you want to do... Sorry.


-Matthias

On 1/18/23 7:48 AM, Upesh Desai wrote:

Hi Matthias, thanks for your reply! Sure, so the use case is as follows.

We currently store some time series data in the state store, and it is 
stored to a changelog as well. The time series data is bucketed (5 
minutes, 1 hour, and 1 day). Our goal was to always only have a max of 2 
time buckets in the store at once. As we receive new timeseries data, we 
figure out what time bucket it belongs to, and add it to its respective 
bucket. We have a “grace period” which allows for late arriving data to 
be processed even after a time bucket has ended. That’s the reason why 
we have this constraint of 2 time buckets max within the store; 1 for 
the previous bucket in its grace period, 1 for the current bucket.


So we wanted to extend the base state store and add a simple in-memory 
map to track the 2 time buckets per timeseries (that’s the store key). A 
couple reasons why we don’t want to add this as a separate state store 
or the existing store are:
1. There is a ton of serialization / deserialization that happens behind 
the scenes


2. This new time bucket tracking map would only be updated a couple 
times per time bucket, and does not need to be updated on every message 
read.


3. There’s no API on the included stores that allows us to do so

Therefore, I thought it best to try to use the existing store 
functionality, create a “new state store” that really just instantiates 
one of the included stores within, add this in memory map, and then plug 
into/alter/extend the restore functionality to populate the time bucket 
tracking map during restore time.


It sounds like I will either have to 1) create a custom state store from 
scratch, or 2) see if there is a post-restore hook that can then call a 
method to scan the whole store and build up the time bucket map before 
starting to process.


Any advice on Kafka streams / state store logic would be appreciated!

-Upesh

Upesh Desai​	 | 	Senior Software Developer	 | 	*ude...@itrsgroup.com* 
<mailto:ude...@itrsgroup.com>


*www.itrsgroup.com* <https://www.itrsgroup.com/>  


<https://www.itrsgroup.com/>



*From: *Matthias J. Sax 
*Date: *Wednesday, January 18, 2023 at 12:50 AM
*To: *users@kafka.apache.org 
*Subject: *Re: Custom Kafka Streams State Restore Logic

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might
change at any point, and it was never designed to be extended...


-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:

Hello all,

I am currently working on creating a new InMemoryWindowStore, by 
extending the default in memory window store. One of the roadblocks I’ve 
run into is finding a way to add some custom logic when the state store 
is being restored from the changelog. I know that this is possible if I 
completely write the store logic from scratch, but we really only want 
to add a tiny bit of custom logic, and do not want to have to replicate 
all the existing logic.


Is there a simple way for this to be done? I see the default 
implementation in the InMemoryWindowStore :


context.register(
      root,
      (RecordBatchingStateRestoreCallback) records -> {
      for (final ConsumerRecord record : records) {
      put(
      Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
      record.value(),
/extractStoreTimestamp/(record.key())
      );
 
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(

      record,
      consistencyEnabled,
      position
      );
      }
      }
);

Thanks in advance!

Upesh

<https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

   
Upesh Desai​

Senior Software Developer

*ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com 
<mailto:ude...@itrsgroup.com>>
*www.itrsgroup.com* <https://www.itrsgroup.com/ <https://www.itrsgroup.com/>>

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]





Re: Custom Kafka Streams State Restore Logic

2023-01-17 Thread Matthias J. Sax

Guess it depends what you actually want to achieve?

Also note: `InMemoryWindowStore` is an internal class, and thus might 
change at any point, and it was never designed to be extended...



-Matthias

On 1/13/23 2:55 PM, Upesh Desai wrote:

Hello all,

I am currently working on creating a new InMemoryWindowStore, by 
extending the default in memory window store. One of the roadblocks I’ve 
run into is finding a way to add some custom logic when the state store 
is being restored from the changelog. I know that this is possible if I 
completely write the store logic from scratch, but we really only want 
to add a tiny bit of custom logic, and do not want to have to replicate 
all the existing logic.


Is there a simple way for this to be done? I see the default 
implementation in the InMemoryWindowStore :


context.register(
     root,
     (RecordBatchingStateRestoreCallback) records -> {
     for (final ConsumerRecord record : records) {
     put(
     Bytes./wrap/(/extractStoreKeyBytes/(record.key())),
     record.value(),
/extractStoreTimestamp/(record.key())
     );
 
ChangelogRecordDeserializationHelper./applyChecksAndUpdatePosition/(

     record,
     consistencyEnabled,
     position
     );
     }
     }
);

Thanks in advance!

Upesh




Upesh Desai​
Senior Software Developer

*ude...@itrsgroup.com* 
*www.itrsgroup.com* 

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]



Re: [ANNOUNCE] New committer: Stanislav Kozlovski

2023-01-17 Thread Matthias J. Sax

Congrats!

On 1/17/23 1:26 PM, Ron Dagostino wrote:

Congratulations, Stan!

Ron


On Jan 17, 2023, at 12:29 PM, Mickael Maison  wrote:

Congratulations Stanislav!


On Tue, Jan 17, 2023 at 6:06 PM Rajini Sivaram  wrote:

Congratulations, Stan!

Regards,

Rajini


On Tue, Jan 17, 2023 at 5:04 PM Tom Bentley  wrote:

Congratulations!


On Tue, 17 Jan 2023 at 16:52, Bill Bejeck  wrote:



Congratulations Stan!

-Bill

On Tue, Jan 17, 2023 at 11:37 AM Bruno Cadonna 

wrote:



Congrats Stan!

Well deserved!

Best,
Bruno

On 17.01.23 16:50, Jun Rao wrote:

Hi, Everyone,

The PMC of Apache Kafka is pleased to announce a new Kafka committer
Stanislav Kozlovski.

Stan has been contributing to Apache Kafka since June 2018. He made

various

contributions including the following KIPs.

KIP-455: Create an Administrative API for Replica Reassignment
KIP-412: Extend Admin API to support dynamic application log levels

Congratulations, Stan!

Thanks,

Jun (on behalf of the Apache Kafka PMC)









[ANNOUNCE] New committer: Walker Carlson

2023-01-17 Thread Matthias J. Sax

Dear community,

I am pleased to announce Walker Carlson as a new Kafka committer.

Walker has been contributing to Apache Kafka since November 2019. He 
made various contributions including the following KIPs.


KIP-671: Introduce Kafka Streams Specific Uncaught Exception Handler
KIP-696: Update Streams FSM to clarify ERROR state meaning
KIP-715: Expose Committed offset in streams


Congratulations Walker and welcome on board!


Thanks,
  -Matthias (on behalf of the Apache Kafka PMC)


Re: [ANNOUNCE] New committer: Edoardo Comar

2023-01-06 Thread Matthias J. Sax

Congrats!

On 1/6/23 5:15 PM, Luke Chen wrote:

Congratulations, Edoardo!

Luke

On Sat, Jan 7, 2023 at 7:58 AM Mickael Maison 
wrote:


Congratulations Edo!


On Sat, Jan 7, 2023 at 12:05 AM Jun Rao  wrote:


Hi, Everyone,

The PMC of Apache Kafka is pleased to announce a new Kafka committer

Edoardo

Comar.

Edoardo has been a long time Kafka contributor since 2016. His major
contributions are the following.

KIP-302: Enable Kafka clients to use all DNS resolved IP addresses
KIP-277: Fine Grained ACL for CreateTopics API
KIP-136: Add Listener name to SelectorMetrics tags

Congratulations, Edoardo!

Thanks,

Jun (on behalf of the Apache Kafka PMC)






Re: [ANNOUNCE] New committer: Justine Olshan

2023-01-03 Thread Matthias J. Sax

Congrats!

On 12/29/22 6:47 PM, ziming deng wrote:

Congratulations Justine!
—
Best,
Ziming


On Dec 30, 2022, at 10:06, Luke Chen  wrote:

Congratulations, Justine!
Well deserved!

Luke

On Fri, Dec 30, 2022 at 9:15 AM Ron Dagostino  wrote:


Congratulations, Justine!Well-deserved., and I’m very happy for you.

Ron


On Dec 29, 2022, at 6:13 PM, Israel Ekpo  wrote:

Congratulations Justine!



On Thu, Dec 29, 2022 at 5:05 PM Greg Harris



wrote:

Congratulations Justine!


On Thu, Dec 29, 2022 at 1:37 PM Bill Bejeck  wrote:

Congratulations Justine!


-Bill


On Thu, Dec 29, 2022 at 4:36 PM Philip Nee 

wrote:



wow congrats!

On Thu, Dec 29, 2022 at 1:05 PM Chris Egerton <

fearthecel...@gmail.com



wrote:


Congrats, Justine!

On Thu, Dec 29, 2022, 15:58 David Jacot  wrote:


Hi all,

The PMC of Apache Kafka is pleased to announce a new Kafka

committer

Justine
Olshan.

Justine has been contributing to Kafka since June 2019. She

contributed

53

PRs including the following KIPs.

KIP-480: Sticky Partitioner
KIP-516: Topic Identifiers & Topic Deletion State Improvements
KIP-854: Separate configuration for producer ID expiry
KIP-890: Transactions Server-Side Defense (in progress)

Congratulations, Justine!

Thanks,

David (on behalf of the Apache Kafka PMC)
















Re: Kafka Stream: The state store, wkstore, may have migrated to another instance

2022-12-29 Thread Matthias J. Sax

Sounds like a SpringBoot issue rather than a KS issues.

-Matthias

On 12/29/22 2:45 AM, Nawal Sah wrote:

Hi,

My SpringBoot stream application works fine in a fresh start of the
clustered environment.
But when I restart one of the pods out of two pods, I start getting the
below exception from "KafkaStreams.store". I debugged the state but found
it was RUNNING.

*org.apache.kafka.streams.errors.InvalidStateStoreException: The state
store, wkstore, may have migrated to another instance.*
*at
org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:67)
   *
*at
org.apache.kafka.streams.state.internals.CompositeReadOnlyWindowStore.fetchAll(CompositeReadOnlyWindowStore.java:175)*

To fix this, I must restart all the pods/instances or the cluster.

*Steps to reproduce*

1. Kafka Stream application environment
'kafka-streams', version: '3.0.1'
'kafka-clients', version: '3.0.1'
2. Create a cluster environment with at least two replication factors of
the Kafka Stream application.
3. Restart one of the pods.
4. Call KafkaStreams:store(), it starts throwing an exception. If not then
try at least 5 times to restart the same pod.

Priority: *Blocker*


*Regards,*

Nawal Sah

(M): +91-9717932863



Re: [ANNOUNCE] New committer: Satish Duggana

2022-12-27 Thread Matthias J. Sax

Congrats!

On 12/27/22 10:20 AM, Kirk True wrote:

Congrats, Satish!

On Fri, Dec 23, 2022, at 10:07 AM, Jun Rao wrote:

Hi, Everyone,

The PMC of Apache Kafka is pleased to announce a new Kafka committer Satish
Duggana.

Satish has been a long time Kafka contributor since 2017. He is the main
driver behind KIP-405 that integrates Kafka with remote storage, a
significant and much anticipated feature in Kafka.

Congratulations, Satish!

Thanks,

Jun (on behalf of the Apache Kafka PMC)





Re: [ANNOUNCE] New committer: Josep Prat

2022-12-20 Thread Matthias J. Sax

Congrats!

On 12/20/22 12:01 PM, Josep Prat wrote:

Thank you all!

———
Josep Prat

Aiven Deutschland GmbH

Immanuelkirchstraße 26, 10405 Berlin

Amtsgericht Charlottenburg, HRB 209739 B

Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen

m: +491715557497

w: aiven.io

e: josep.p...@aiven.io

On Tue, Dec 20, 2022, 20:42 Bill Bejeck  wrote:


Congratulations Josep!

-Bill

On Tue, Dec 20, 2022 at 1:11 PM Mickael Maison 
wrote:


Congratulations Josep!

On Tue, Dec 20, 2022 at 6:55 PM Bruno Cadonna 

wrote:


Congrats, Josep!

Well deserved!

Best,
Bruno

On 20.12.22 18:40, Kirk True wrote:

Congrats Josep!

On Tue, Dec 20, 2022, at 9:33 AM, Jorge Esteban Quilcate Otoya wrote:

Congrats Josep!!

On Tue, 20 Dec 2022, 17:31 Greg Harris,




wrote:


Congratulations Josep!

On Tue, Dec 20, 2022 at 9:29 AM Chris Egerton <

fearthecel...@gmail.com>

wrote:


Congrats Josep! Well-earned.

On Tue, Dec 20, 2022, 12:26 Jun Rao 

wrote:



Hi, Everyone,

The PMC of Apache Kafka is pleased to announce a new Kafka

committer

Josep

   Prat.

Josep has been contributing to Kafka since May 2021. He

contributed 20

PRs

including the following 2 KIPs.

KIP-773 Differentiate metric latency measured in ms and ns
KIP-744: Migrate TaskMetadata and ThreadMetadata to an interface

with

internal implementation

Congratulations, Josep!

Thanks,

Jun (on behalf of the Apache Kafka PMC)

















Re: [ANNOUNCE] New committer: Ron Dagostino

2022-12-16 Thread Matthias J. Sax

Congrats!

On 12/15/22 7:09 AM, Rajini Sivaram wrote:

Congratulations, Ron! Well deserved!!

Regards,

Rajini

On Thu, Dec 15, 2022 at 11:42 AM Ron Dagostino  wrote:


Thank you, everyone!

Ron


On Dec 15, 2022, at 5:09 AM, Bruno Cadonna  wrote:

Congrats Ron!

Best,
Bruno


On 15.12.22 10:23, Viktor Somogyi-Vass wrote:
Congrats Ron! :)

On Thu, Dec 15, 2022 at 10:22 AM Mickael Maison <

mickael.mai...@gmail.com>

wrote:
Congratulations Ron!

On Thu, Dec 15, 2022 at 9:41 AM Eslam Farag 

wrote:


Congratulations, Ron ☺️

On Thu, 15 Dec 2022 at 10:40 AM Tom Bentley 

wrote:



Congratulations!

On Thu, 15 Dec 2022 at 07:40, Satish Duggana <

satish.dugg...@gmail.com



wrote:


Congratulations, Ron!!

On Thu, 15 Dec 2022 at 07:48, ziming deng 


wrote:


Congratulations, Ron!
Well deserved!

--
Ziming


On Dec 15, 2022, at 09:16, Luke Chen  wrote:

Congratulations, Ron!
Well deserved!

Luke















Re: [ANNOUNCE] New committer: Viktor Somogyi-Vass

2022-12-16 Thread Matthias J. Sax

Congrats!

On 12/15/22 7:10 AM, Rajini Sivaram wrote:

Congratulations, Viktor!

Regards,

Rajini


On Thu, Dec 15, 2022 at 11:41 AM Ron Dagostino  wrote:


Congrats to you too, Victor!

Ron


On Dec 15, 2022, at 4:59 AM, Viktor Somogyi-Vass <

viktor.somo...@cloudera.com.invalid> wrote:


Thank you everyone! :)


On Thu, Dec 15, 2022 at 10:22 AM Mickael Maison <

mickael.mai...@gmail.com>

wrote:

Congratulations Viktor!


On Thu, Dec 15, 2022 at 10:06 AM Tamas Barnabas Egyed
 wrote:

Congratulations, Viktor!








Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-07 Thread Matthias J. Sax
Good point about the docs... I guess it must support concurrency do to 
IQ, which might iterator over the store while `process()` modifies it. I 
was only reasoning about `process()` and punctuation and forgot IQ.


So it seems we indeed have this contract -- what is good new for you.

However, I don't think that there is any guarantee that you might "see" 
concurrent modification (IIRC, RocksDB uses snapshot isolation for 
iterators). But maybe that's good enough for you?



-Matthias


On 11/7/22 11:13 AM, Joshua Suskalo wrote:

"Matthias J. Sax"  writes:


In general, it's not safe to keep the iterator open, because when process() is
executed in-between two punctuator calls, it might modify the store and
invalidate the iterator. There is no guarantee that the returned iterator
supports concurrency.


This makes sense but unfortunately adds significant challenge to making these
operations happen concurrently, which is effectively a hard requirement for my
usecase due to the scale of the state stores' contained data.


Hence, even if it happens that the currently used iterator is concurrent, there
is no API contract about it.


This surprises me though, because it seems to contratict what's stated in the
ReadOnlyKeyValueStore documentation[1].


The returned iterator must be safe from ConcurrentModificationExceptions and
must not return null values. Order is not guaranteed as bytes lexicographical
ordering might not represent key order.


Maybe I'm reading this wrong, but it seems to imply that the returned iterator
must be safe in the face of concurrent modifications, which is required if you
are permitted to make a read-modify-write cycle while using the iterator, which
the existing version of my application has been doing correctly so far as I can
tell.

Am I vastly misunderstanding the intended usecase for punctuators and need to
determine a different mechanism for performing periodic operations on a data
store?

[1]: 
<https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/state/ReadOnlyKeyValueStore.html#all()>


Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-07 Thread Matthias J. Sax
In general, it's not safe to keep the iterator open, because when 
process() is executed in-between two punctuator calls, it might modify 
the store and invalidate the iterator. There is no guarantee that the 
returned iterator supports concurrency.


Hence, even if it happens that the currently used iterator is 
concurrent, there is no API contract about it.


-Matthias

On 11/7/22 7:41 AM, Joshua Suskalo wrote:

Hello Matthias, thanks for the response!


"Matthias J. Sax"  writes:


Spanning your own thread and calling context.forward() is _not_ safe, and there
is currently no way for you to make is safe. The runtime code makes certain
assumptions about being single threaded which would break if you call
context.forward() from a different thread. (The runtime _always_ assume that
context.forward() is called inside process() or inside the punctuation
callback.)


This is how I expected, so I'm not too concerned here.


The only way forward I can see, would be trying to make the punctuation call
shorter, eg, not scanning the full store but only a small part of it, such that
the thread can go back to execute process() quicker (it's of course an
additional challenge to keep track where you stopped the scan and to resume
it...), and to make the punctuation interval shorter.


This is where I have another question about safety. Is it safe to use a
KVStoreIterator that was retrieved during a punctuator after that punctuator
call has exited, perhaps using it to store offset information across multiple
runs? This seems to me to be the most obvious way to handle this.

A related question if that one has an affirmative answer would be asking if the
KVStoreIterator can be safely sent to another thread for use (akin to Rust's
Send trait, in that I do not wish to have concurrent access, only to use it from
another thread), as if that were possible I could use the j.u.c.ConcurrentQueue
to allow a thread created on the punctuator to compute the messages that need to
be sent and enqueue them, and then in further punctuator runs I could then send
messages which have been computed.


Hope this helps.


What you've said already is quite helpful and I will begin pursuing methods
based on what you've said, but I also hope that someone can answer my further
questions since they will be helpful to my implementation as well.

Thanks so much,
Joshua


Re: Thread Safety of Punctuator Functions and Processor Contexts

2022-11-04 Thread Matthias J. Sax
Your observation is correct. The Processor#process() and punctuation 
callback are executed on a single thread. It's by design to avoid the 
issue of concurrency (writing thread safe code is hard and we want to 
avoid putting this burden onto the user). There is currently no plans to 
make process() and punctuation concurrent, and it would require a larger 
change inside the runtime code.


Spanning your own thread and calling context.forward() is _not_ safe, 
and there is currently no way for you to make is safe. The runtime code 
makes certain assumptions about being single threaded which would break 
if you call context.forward() from a different thread. (The runtime 
_always_ assume that context.forward() is called inside process() or 
inside the punctuation callback.)


The only way forward I can see, would be trying to make the punctuation 
call shorter, eg, not scanning the full store but only a small part of 
it, such that the thread can go back to execute process() quicker (it's 
of course an additional challenge to keep track where you stopped the 
scan and to resume it...), and to make the punctuation interval shorter.


Hope this helps.

-Matthias

On 11/3/22 11:30 AM, Joshua Suskalo wrote:

I have data that I am storing in a state store which I would like to
periodically run some code over, and the way I have decided to do this is
via a punctuator from inside a Transformer, which gets an iterator over the
state store, performs actions, and forwards events on.

So far, everything works fine, but I do have one issue: messages coming
into the transformer are intended to act as updates to individual values
stored in the state store, and should be incorporated as immediately as
possible, but whenever the punctuator is running the stream thread is tied
up with traversing the iterator and cannot process new messages.

I have written the code in such a way that the transformation function for
these entities is thread safe with respect to the code in the punctuator,
such that the punctuator could fire from one thread while another thread
processes new events without issue, however this was done under a mistaken
understanding of how stream threads are allocated, as I had believed that
punctuators were fired from a different thread as compared to the transform
method.

Inside the punctuator I use the ProcessorContext to forward additional
messages on, as well as in the transform method, and I would like to know
about the thread safety of having those two things happening concurrently
from separate threads, as might occur if I were to have the punctuator
start a thread to perform the iteration, rather than doing the iteration
itself. Is this a safe thing to do, or is this going to be prone to bugs,
even assuming that I have written code to ensure that no individual key in
the state store will be manipulated concurrently from both threads at once?

I've looked carefully through the documentation and looked for others doing
similar things online and have come up empty handed, and while I am happy
to look through the code of kafka streams to find out for myself, that will
take some time as I'm mostly unfamiliar with the codebase, and I was hoping
that I might be able to get an answer more quickly here.

Joshua



Re: [ANNOUNCE] New Kafka PMC Member: Bruno Cadonna

2022-11-02 Thread Matthias J. Sax

Congrats!

On 11/1/22 7:08 PM, Luke Chen wrote:

Congrats Bruno!
Well deserved!

Luke

On Wed, Nov 2, 2022 at 10:07 AM John Roesler  wrote:


Congratulations, Bruno!!!

On Tue, Nov 1, 2022, at 15:16, Lucas Brutschy wrote:

Wow, congratulations!

On Tue, Nov 1, 2022 at 8:55 PM Chris Egerton 

wrote:


Congrats!

On Tue, Nov 1, 2022, 15:44 Bill Bejeck 

wrote:



Congrats Bruno! Well deserved.

-Bill

On Tue, Nov 1, 2022 at 3:36 PM Guozhang Wang 

wrote:



Hi everyone,

I'd like to introduce our new Kafka PMC member, Bruno.

Bruno has been a committer since April. 2021 and has been very

active in

the community. He's a key contributor to Kafka Streams, and also

helped

review a lot of horizontal improvements such as Mockito. It is my

pleasure

to announce that Bruno has agreed to join the Kafka PMC.

Congratulations, Bruno!

-- Guozhang Wang, on behalf of Apache Kafka PMC









Re: How it is safe to break message ordering but not idempotency after getting an OutOfOrderSequenceException?

2022-06-15 Thread Matthias J. Sax

In general, the producer would retry internally.

If it sends batch X, and didn't get an ack back for it, and gets a 
sequence error for X+1 that it sent after X, the producer would resend X 
again (as it knows X was never received by the broker) and afterwards X+1.


Only if the producer did get an ack for X (and thus purged the batch 
from it's internal buffer), it would raise the error to the application 
as batch X cannot be resend, because the broker did ack, but somehow 
seems to have lost the data. (Reason could be some miss-configuration or 
unclean leader election as examples.)


For the "risks reordering of sent records" the issue is as follows: 
after the exception, the producer would not drop buffered records, but 
would bump it's epoch including a sequence number reset. Thus, if you 
call `send()` for the record of X now to they would be put _after_ the 
already buffered record from X-1 (X-1 can still be retried with the new 
epoch and new sequence number Y, and the original batch X could become Y+1).


Only if you close() the producer and put all data back via send() in the 
right order, ordering can be preserved. But it's now the apps 
responsibility to call send() in the right order a second time.



-Matthias


On 6/7/22 3:37 PM, Gabriel Giussi wrote:

Thanks for the answer Matthias.
I still have doubts about the meaning of "risks reordering of sent record".
If I understood correctly the example you gave is something like this
1. Producer sends batch with sequence number X
2. That request gets lost in the network
3. Producer sends batch with sequence number X+1
4. Broker receives batch with sequence number X+1 and returns an error and
the Producer throws a OutOfOrderSequenceException

In that situation we could keep retrying sending batch with sequence number
X+1 but we will keep getting a OutOfOrderSequenceException, or we ideally
also resend a batch with sequence number X, and after being accepted send
the one with X+1.
If what I'm saying is correct then I can't see how this can reorder the
messages, I mean if both batches include a message being written to topic
A, could messages from batch with sn X+1 end up being persisted with an
offset lesser than the ones from the batch with sn X?
Does this question make sense?

El mar, 7 jun 2022 a las 16:13, Matthias J. Sax ()
escribió:


Yes, the broker de-dupes using the sequence number.

But for example, if a sequence number is skipped, you could get this
exception: the current batch of messages cannot be appended to the log,
as one batch is missing, and the producer would need to re-send the
previous/missing batch with lower sequence number before it can move to
the "next" (ie current) batch.

Does this make sense?


-Matthias

On 5/27/22 10:43 AM, Gabriel Giussi wrote:

The docs say
"This exception indicates that the broker received an unexpected sequence
number from the producer, which means that data may have been lost. If

the

producer is configured for idempotence only (i.e. if enable.idempotence

is

set and no transactional.id is configured), it is possible to continue
sending with the same producer instance, but doing so risks reordering of
sent record"

Isn't the broker using the monotonically increasing sequence number to
dedup messages? So how can it break message ordering without breaking
idempotency?
I can't see an example scenario where this could happen, I guess
the OutOfOrderSequenceException can only happen
with max.in.flight.requests.per.connection > 1, but even in that case why
are not going to keep getting an OutOfOrderSequenceException but instead

a

success that broke message ordering?

Thanks.







Re: How it is safe to break message ordering but not idempotency after getting an OutOfOrderSequenceException?

2022-06-07 Thread Matthias J. Sax

Yes, the broker de-dupes using the sequence number.

But for example, if a sequence number is skipped, you could get this 
exception: the current batch of messages cannot be appended to the log, 
as one batch is missing, and the producer would need to re-send the 
previous/missing batch with lower sequence number before it can move to 
the "next" (ie current) batch.


Does this make sense?


-Matthias

On 5/27/22 10:43 AM, Gabriel Giussi wrote:

The docs say
"This exception indicates that the broker received an unexpected sequence
number from the producer, which means that data may have been lost. If the
producer is configured for idempotence only (i.e. if enable.idempotence is
set and no transactional.id is configured), it is possible to continue
sending with the same producer instance, but doing so risks reordering of
sent record"

Isn't the broker using the monotonically increasing sequence number to
dedup messages? So how can it break message ordering without breaking
idempotency?
I can't see an example scenario where this could happen, I guess
the OutOfOrderSequenceException can only happen
with max.in.flight.requests.per.connection > 1, but even in that case why
are not going to keep getting an OutOfOrderSequenceException but instead a
success that broke message ordering?

Thanks.



Re: Newbie how to get key/value pojo out of a stream?

2022-06-07 Thread Matthias J. Sax
`enable.auto.commit` is a Consumer config and does not apply to Kafka 
Stream.


In Kafka Streams, you basically always have auto commit enabled, and you 
can control how frequently commits happen via `commit.interval.ms`.


Also on `close()` Kafka Streams would commit offsets.


-Matthias

On 5/31/22 12:29 PM, Luca wrote:

Hi Andy,

The defaults are sensible enough that, under normal operational conditions, 
your app should pick up from where it left. To dig a little more into this, I 
suggest you look into `auto.offset.reset` and `enable.auto.commit` options.

In case, you do need to reprocess everything, kafka streams comes with a handy 
reset tool. You can read about it here: 
https://kafka.apache.org/32/documentation/streams/developer-guide/app-reset-tool.html

Luca

On Tue, May 31, 2022, at 5:17 PM, andrew davidson wrote:

Thanks Luca

This is exactly what I was looking for.

On a related note let's say I stop and restart my application. What would I 
have to do so that the I do not re process events?

I am still working through the kstreams 101 tutorial. I have not gotten to the 
DSL tutorials yet

Andy

On 5/30/22, 11:16 PM, "Luca"  wrote:

 Hi Andy,

 If I understand your problem correctly, you want a "foreach" terminal 
operation. You can check out the API here: 
https://kafka.apache.org/32/documentation/streams/developer-guide/dsl-api.html

 Luca

 On Tue, May 31, 2022, at 6:37 AM, Andy wrote:
 > All the Kstream examples I have found demonstrate how to use map, filter,
 > and join on streams. The last step they typically user to() to
 > publish/produce the results to a new stream
 >
 > How can I get the data out of the stream? For example I need to send the
 > data to a legacy data that can not use kafka. Or maybe I want to plot the
 > data,…
 >
 > I looked at the java doc and did not find anything
 >
 > Any idea what I should “google” to to find a code example?
 >
 > Kind regards
 >
 > Andy
 >

 lucapette.me



lucapette.me



Re: kafka stream - sliding window - getting unexpected output

2022-05-20 Thread Matthias J. Sax

Not sure atm.

It seems you are printing the timestamp extracted from the payload:


out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));


Does this timestamp really map to the window?

You remove the window information so maybe you are looking at the wrong 
data?



.map((Windowed key, OutputPojo out) -> {
   return new KeyValue<>(key.key(),out) ;
 })



For the input: Do you use a custom timestamp extractor and use the 
payload timestamp? If not, does the record timestamp and the payload 
timestamp match?



-Matthias


On 5/18/22 11:32 PM, Shankar Mane wrote:

@Matthias J. Sax / All

Have added below line :


.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))




Here is the output : (for uuid (*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*,
expecting single output but that is not the case here. Which 1 is the final
output from those 2 rows for the same uuid ?

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,

strTime=2022-05-19 11:48:08.128, uuid=fb6bea5f-8fd0-4c03-8df3-aaf392f04a5a)


[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,

strTime=2022-05-19 11:48:10.328, uuid=b4ab837f-b10a-452d-a663-719215d2992f)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-19 11:48:12.527, uuid=8fa1b621-c967-4770-9f85-9fd84999c97c)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:14.726, uuid=1fc21253-7859-45ef-969e-82ed596c4fa0)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:16.925, uuid=
*2cbef750-325b-4a2f-ac39-b2c23fa0313f)*
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:16.925, uuid=
*2cbef750-325b-4a2f-ac39-b2c23fa0313f*)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:19.124, uuid=8f72454c-ec02-42c1-84e1-bcf262db436f)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:21.322, uuid=e4e73232-dbd4-45b4-aae9-f3bd2c6e54b4)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=4, sum=40.0,
strTime=2022-05-19 11:48:23.522, uuid=fc49906e-54d0-4e80-b394-3ebaf6e74eaa)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=5, sum=50.0,
strTime=2022-05-19 11:48:25.721, uuid=fbe62fa4-e7c4-437f-b976-0bb7ae0c4390)



On Wed, May 18, 2022 at 10:21 PM Matthias J. Sax  wrote:


Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()`
after the aggregation and configure it to only "emit on window close".

-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:

Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever

user performs any actions at time [ t1 ], we would like to see his

activity

in [ t1 - last 24 hours]. Using this, to show the user some

recommendations.




-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :




builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
  .filter((k, v) -> v != null)
  .map((k,v) -> KeyValue.pair(v.getUserId(), v))
  //.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
  .groupByKey()



.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),

windowDuration))
  .aggregate(OutputPojo::new, (k, tr, out) -> {
  out.setUserId(tr.getUserId());
  out.setCount(out.getCount() +1);
  out.setSum(out.getSum() + tr.getInt4());
  out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
  waitForMs(200); //added delay just for analysing

output

  return out;
  }, Materialized.with(stringSerde, outputSerde))
  .suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
  .toStream(

Re: kafka stream - sliding window - getting unexpected output

2022-05-18 Thread Matthias J. Sax

Emitting intermediate result is by-design.

If you don't want to get intermediate result, you can add `suppress()` 
after the aggregation and configure it to only "emit on window close".


-Matthias

On 5/17/22 3:20 AM, Shankar Mane wrote:

Hi All,

Our use case is to use sliding window. (for e.g. at any point, whenever

user performs any actions at time [ t1 ], we would like to see his activity
in [ t1 - last 24 hours]. Using this, to show the user some recommendations.




-- I have code ready and it works without any errors.
-- aggregations happen as expected.
-- but the output generated is unexpected. As windows gets slides, i am
getting mixed output which includes intermediate aggregated records also
coming with final aggregated outputs.

Could someone please help me here ?  what can I do here to get ONLY final
aggregated output.


Code snippet :




builder.stream(tempReadingTopic, Consumed.with(stringSerde, inputSerde))
 .filter((k, v) -> v != null)
 .map((k,v) -> KeyValue.pair(v.getUserId(), v))
 //.through("slidingbykey",
Produced.with(Serdes.String(), inputSerde))
 .groupByKey()

.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(5000),
windowDuration))
 .aggregate(OutputPojo::new, (k, tr, out) -> {
 out.setUserId(tr.getUserId());
 out.setCount(out.getCount() +1);
 out.setSum(out.getSum() + tr.getInt4());
 out.setUuid(tr.getUuid());

out.setStrTime(TimestampLongToString.getTime(tr.getTimestamp()));
 waitForMs(200); //added delay just for analysing output
 return out;
 }, Materialized.with(stringSerde, outputSerde))
 .suppress(Suppressed.untilTimeLimit(windowDuration,
Suppressed.BufferConfig.unbounded()))
 .toStream()
 .map((Windowed key, OutputPojo out) -> {
 return new KeyValue<>(key.key(),out) ;
 })
 .print(Printed.toSysOut());
//.to(aveTempOutputTopic, Produced.with(stringSerde,
outputSerde))
 ;






Input data :

for i in {1..10}; do sleep 1s;python3 del.py 1001 10;sleep 1s; done

{'userId': '1001', 'timestamp': 1652781716234, 'int4': 10, 'uuid':
'64f019ee-9cf4-427d-b4c9-f2b5f88820e1'}
{'userId': '1001', 'timestamp': 1652781718436, 'int4': 10, 'uuid':
'cf173b3e-c34f-470a-ba15-ef648d0be8b9'}
{'userId': '1001', 'timestamp': 1652781720634, 'int4': 10, 'uuid':
'48d2b4ea-052d-42fa-a998-0216d928c034'}
{'userId': '1001', 'timestamp': 1652781722832, 'int4': 10, 'uuid':
'55a6c26c-3d2c-46f1-ab3c-04927f660cbe'}
{'userId': '1001', 'timestamp': 1652781725029, 'int4': 10, 'uuid':
'dbfd8cee-565d-496b-b5a8-773ae64bc518'}
{'userId': '1001', 'timestamp': 1652781727227, 'int4': 10, 'uuid':
'135dc5cd-50cb-467b-9e63-300fdeedaf75'}
{'userId': '1001', 'timestamp': 1652781729425, 'int4': 10, 'uuid':
'66d8e3c7-8f63-43ca-acf1-e39619bf33a0'}
{'userId': '1001', 'timestamp': 1652781731623, 'int4': 10, 'uuid':
'f037712b-42a5-4449-bcc2-cf6eafddf5ad'}
{'userId': '1001', 'timestamp': 1652781733820, 'int4': 10, 'uuid':
'7baa4254-b9da-43dc-bbb7-4caede578aeb'}
{'userId': '1001', 'timestamp': 1652781736018, 'int4': 10, 'uuid':
'16541989-f3ba-49f6-bd31-bf8a75ba8eac'}






Output (*Unexpected*) :  below output is captured at each sliding window of
1s duration   (but input data is published at 2s of interval) :

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,

strTime=2022-05-17 15:31:28.263,
uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)  > seems older UUID
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
strTime=2022-05-17 15:31:28.263, uuid=966277fd-3bdc-45c2-aa6a-c9c0dbe89c34)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=1, sum=10.0,
strTime=2022-05-17 15:31:56.234, uuid=64f019ee-9cf4-427d-b4c9-f2b5f88820e1)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:31:58.436, uuid=cf173b3e-c34f-470a-ba15-ef648d0be8b9)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-17 15:32:00.634, uuid=48d2b4ea-052d-42fa-a998-0216d928c034)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=2, sum=20.0,
strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)

[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, count=3, sum=30.0,
strTime=2022-05-17 15:32:02.832, uuid=55a6c26c-3d2c-46f1-ab3c-04927f660cbe)
[KSTREAM-MAP-11]: 1001, OutputPojo(userId=1001, 

Re: How to achieve high availability in a Kafka Streams app during deployment?

2022-03-05 Thread Matthias J. Sax

Hard to answer from a 10,000ft view.

In general, a rolling upgrade (ie, bounce one instance at a time) is 
recommended. If you have state, you would need to ensure that state is 
not lost during a bounce. As you are using Kubernetes, using stateful 
sets that allow you to re-attach disk should be the way to go.


Rolling upgrade are only supported if the new program (ie, Topology) is 
compatible to the old one. The alternative to a rolling upgrade would 
be, to deploy the new version in parallel to the old one (using a 
different application-id), and after the new version is running stable, 
shutting down the old version.


Hope this helps.

-Matthias

On 2/28/22 12:11, Ismar Slomic wrote:

We run Kafka Streams (Java) apps on Kubernetes to *consume*, *process* and
*produce* real time data in our Kafka Cluster (running Confluent Community
Edition v7.0/Kafka v3.0). How can we do a deployment of our apps in a way
that limits downtime on consuming records? Our initial target was approx *2
sec* downtime a single time for each task.

We are aiming to do continuous deployments of changes to the production
environment, but deployments are too disruptive by causing downtime in
record consumption in our apps, leading to latency in produced real time
records.

Since this question has already been described in detail on Stack Overflow (
https://stackoverflow.com/questions/71222496/how-to-achieve-high-availability-in-a-kafka-streams-app-during-deployment),
but has not been answered yet, we would like to refer to it instead of
copy/pasting the content in this mailing list.

Please let me know if you prefer to have the complete question in the
mailing list instead.



Re: How to achieve high availability in a Kafka Streams app during deployment?

2022-03-05 Thread Matthias J. Sax

Hard to answer from a 10,000ft view.

In general, a rolling upgrade (ie, bounce one instance at a time) is 
recommended. If you have state, you would need to ensure that state is 
not lost during a bounce. As you are using Kubernetes, using stateful 
sets that allow you to re-attach disk should be the way to go.


Rolling upgrade are only supported if the new program (ie, Topology) is 
compatible to the old one. The alternative to a rolling upgrade would 
be, to deploy the new version in parallel to the old one (using a 
different application-id), and after the new version is running stable, 
shutting down the old version.


Hope this helps.

-Matthias

On 2/28/22 12:11, Ismar Slomic wrote:

We run Kafka Streams (Java) apps on Kubernetes to *consume*, *process* and
*produce* real time data in our Kafka Cluster (running Confluent Community
Edition v7.0/Kafka v3.0). How can we do a deployment of our apps in a way
that limits downtime on consuming records? Our initial target was approx *2
sec* downtime a single time for each task.

We are aiming to do continuous deployments of changes to the production
environment, but deployments are too disruptive by causing downtime in
record consumption in our apps, leading to latency in produced real time
records.

Since this question has already been described in detail on Stack Overflow (
https://stackoverflow.com/questions/71222496/how-to-achieve-high-availability-in-a-kafka-streams-app-during-deployment),
but has not been answered yet, we would like to refer to it instead of
copy/pasting the content in this mailing list.

Please let me know if you prefer to have the complete question in the
mailing list instead.



Re: [ANNOUNCE] New committer: Luke Chen

2022-02-09 Thread Matthias J. Sax

Congratulations! Glad to have you onboard, Luke!

-Matthias

On 2/9/22 16:37, Bill Bejeck wrote:

Congrats Luke! Well deserved.

-Bill

On Wed, Feb 9, 2022 at 7:25 PM Israel Ekpo  wrote:


Congratulations Luke!

Thank you for your service

On Wed, Feb 9, 2022 at 6:22 PM Guozhang Wang  wrote:


The PMC for Apache Kafka has invited Luke Chen (showuon) as a committer

and

we are pleased to announce that he has accepted!

Luke has been actively contributing to Kafka since early 2020. He has
made more than 120 commits on various components of Kafka, with notable
contributions to the rebalance protocol in Consumer and Streams (KIP-766,
KIP-726, KIP-591, KAFKA-12675 and KAFKA12464, to just name a few), as

well

as making an impact on improving test stability of the project. Aside

from

all his code contributions, Luke has been a great participant in
discussions across the board, a very active and helpful reviewer of other
contributors' works, all of which are super valuable and highly

appreciated

by the community.


Thanks for all of your contributions Luke. Congratulations!

-- Guozhang, on behalf of the Apache Kafka PMC


--
Israel Ekpo
Lead Instructor, IzzyAcademy.com
https://www.youtube.com/c/izzyacademy
https://izzyacademy.com/





Re: Kafka Streams - one topic moves faster the other one

2022-01-04 Thread Matthias J. Sax
If you observer timestamps based synchronization issues, you might also 
consider to switch to 3.0 release, that closes a few more gaps to this end.


Cf 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization



-Matthias

On 12/29/21 7:22 PM, Luke Chen wrote:

Hi Miguel,

Yes, the grace period is the solution to fix the problem.

Alternatively, you can try to set a higher value for "max.task.idle.ms"
configuration, because this is some kind of out-of-order data.
Let's say, A topic has 1 record per second (fast), B topic has 1 record per
minute (slow).
You can set the "max.task.idle.ms" as 60 seconds or higher, to force the
stream to wait for 1 minute for the empty topic B, before processing the
records.

 From the document:
https://kafka.apache.org/30/documentation/streams/developer-guide/config-streams#max-task-idle-ms

*Any config value greater than zero indicates the number of extra
milliseconds that Streams will wait if it has a caught-up but empty
partition. In other words, this is the amount of time to wait for new data
to be produced to the input partitions to ensure in-order processing of
data in the event of a slow producer. *

Hope it helps.

Thank you.
Luke

On Thu, Dec 30, 2021 at 2:52 AM Miguel González 
wrote:


Hi team

So I ran into a complicated issue, something which I believe Kafka Streams
is not prepared for.

Basically my app is reading from two topics and joining them.

But when testing the in my staging environment I found, one topic moves
faster than the other one basically pushing stream time forward

Some partitions are even months apart. I found a question on SO detailing
something similar:

https://stackoverflow.com/questions/69126351/bulk-processing-data-through-a-join-with-kafka-streams-results-in-skipping-reco

The problem for me is that joins are no longer working. Setting a huge
grace period has somehow alleviated the problem for now but I don't that
that's the right approach and not all events join at the end anyway? Have
other users faced something similar, if so, how can it be resolved? Can we
somehow delay the processing to make them aligned some how

thanks
- Miguel





Re: How to properly use a clean a TimestampedKeyValueStore

2022-01-04 Thread Matthias J. Sax

Not 100% sure. From what you describe it should work as expected.

It seems `delete()` does not delete the key from the store (ie, RocksDB) 
itself (for unknown reasons)?


Are you closing all your iterators correctly? (More or less a wild guess 
at the moment.)


Did you enable caching for the store? (Just to double check if it could 
be caching related or not.)



-Matthias


On 12/24/21 11:08 AM, Miguel González wrote:

Hello

I'm using Kafka Streams and I have a transformer that uses
a TimestampedKeyValueStore, I have a punctuator that is in charge of
cleaning the store,

Basically I'm iterating the store using kvStore.all() and deleting the keys
based on some logic with kvStore.delete(key);

I'm seeing the changelog topic for the store grow unbounded, I'm seeing
many values with null for the same keys... I think those are called
tombstones right?  but the punctuator is constantly doing the same thing
trying to delete the same keys.. I see more tombstones being inserted.

Is this the expected behavior? If so, how can I correctly clean that store?

thanks
- Miguel



Re: [ANNOUNCE] New Kafka PMC member: David Jacot

2021-12-18 Thread Matthias J. Sax

Congrats!

On 12/17/21 15:46, Bill Bejeck wrote:

Congratulations David! Well deserved.

-Bill

On Fri, Dec 17, 2021 at 6:43 PM José Armando García Sancio
 wrote:


Congrats David!

On Fri, Dec 17, 2021 at 3:09 PM Gwen Shapira  wrote:


Hi everyone,

David Jacot has been an Apache Kafka committer since Oct 2020 and has

been contributing to the community consistently this entire time -
especially notable the fact that he reviewed around 150 PRs in the last
year. It is my pleasure to announce that David agreed to join the Kafka PMC.


Congratulations, David!

Gwen Shapira, on behalf of Apache Kafka PMC




--
-Jose





Re: Kafka Streams app process records until certain date

2021-12-08 Thread Matthias J. Sax

Hard to achieve.

I guess a naive approach would be to use a `flatMapTransform()` to 
implement a filter that drops all record that are not in the desired 
time range.


pause() and resume() are not available in Kafka Streams, but only on the 
KafkaConsumer (The Spring docs you cite is also about the consumer, not 
Kafka Streams).



-Matthias

On 11/24/21 11:05 AM, Miguel González wrote:

Hello

For my use case I need to work with a chuck of records, let's say per
month... We have over two years of data... and we are testing if we can
deploy it to production, but we need to test in small batches.

I have built a Kafka Streams app that processes two input topics and output
to one topic.

I would like to process the first two months of data. Is that possible?

- I have tried blocking the consumer thread using .map and comparing the
timestamp on the message and a timestamp I get from another system that
would tell me until what time I should process on the two KStreams I have
but I have noticed.I also increased MAX_POLL_INTERVAL_MS_CONFIG but I have
noticed the messages that are in range do not get processed and sent to the
output topic.
- I have also seen a Spring Cloud library apparently offer a
pause-resume feature.

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.5/reference/html/spring-cloud-stream-binder-kafka.html#_binding_visualization_and_control_in_kafka_streams_binder
- I have also seen that implementing a transformer or processor could
work but in this case the state store would possible less than years of
data. That is something I would like to avoid.


Any help is appreciated.

regards
- Miguel



Re: Event order in Kafka Streams after Left Join

2021-12-06 Thread Matthias J. Sax

I had heard when doing a join, the timestamp of the generated

message is taken from the message triggering the join or the biggest
timestamp of the two.


In older versions it was the timestamp of the record that triggered the 
join. Since 2.3, it is the maximum of both (cf 
https://issues.apache.org/jira/browse/KAFKA-6455)


You don't need to do anything for this. It's hard-coded. Of course, if 
you want you _could_ manually change the timestamp as pointed out by Luke.


For proper timestamp ordering, you should also upgrade to 3.0 to get the 
latest changes (ie, the KIP improvements mentioned by Luke).


-Matthias

On 12/5/21 12:14 AM, Luke Chen wrote:

Hi Miguel,
Of course you can use "Processor API" to achieve what you want. But it
needs more coding.

Alternatively, I think you can define a better value for "*max.task.idle.ms
*" configuration. Default value is 0, which means
it basically doesn't wait for more data in empty partitions. You can check
the doc here
,
and also the KIP-695
,
KIP-353

for more information.

Thank you.
Luke

On Sat, Dec 4, 2021 at 2:10 AM Miguel González 
wrote:


Hello

So I've been using a Streams app to join two input topics... the messages
have a certain order... but I have seen the messages on the output topic
arriving with a different ordering  Even before, when doing a
map/flatmap operation are processed with different ordering.

Example:

Stream 1: A---B---C---D
Stream 2: A--B--C--D

Output topic: BB---A---AA---CC---DD

I need it to be A--AA--BB---CC---DD

Is there a way that Kafka Streams guarantees the order of messages in the
output topic? I had heard when doing a join, the timestamp of the generated
message is taken from the message triggering the join or the biggest
timestamp of the two. I don't know if this is the case, if it's done... is
that an automatic process or do I need to set the timestamp somewhere?

Another option I have read about is doing something like this:
https://dzone.com/articles/how-to-order-streamed-dataframes where the
author is doing some kind of sorting using an AbstractProcessor, which
seems like a pretty old article. But right I guess I could use a
Transformer or a Processor


Any guidance is really appreciated!

many thanks
- Miguel





Re: Kafka Streams - left join behavior

2021-12-06 Thread Matthias J. Sax

It's fixed in upcoming 3.1 release.

Cf https://issues.apache.org/jira/browse/KAFKA-10847


A stream-(global)table join has different semantics, so I am not sure if 
it would help.


One workaround would be to apply a stateful` faltTransformValues()` 
after the join to "buffer" all NULL-results and only emit them after you 
know no consecutive inner-join result will happen. It's tricky to build 
though.


I would recommend to wait and upgrade to 3.1 after it was releases.


-Matthias

On 11/30/21 12:59 AM, Luke Chen wrote:

Hi Miguel,

Is there a way to force the behavior I need, meaning... using left join

and
a JoinWindows output only one message (A,B) or (A, null)

I think you can try to achieve it by using *KStream-GlobalKTable left join*,
where the GlobalKTable should read all records at the right topic, and then
doing the left join operation. This should then output either (A,B), or (A,
null).

Thank you.
Luke

On Tue, Nov 30, 2021 at 1:23 AM Miguel González 
wrote:


Hello

I have been developing a Kafka Streams app that takes as input two topics
as KStreams, processes them in some way and joins them and sends the
combined message to an output topic.

Here's some code,

final StreamJoined joinParams =
 StreamJoined.with(
 STRING_SERDE,
 StreamSerdeConstants.TRANSACTION_EVENT_SERDE,
 StreamSerdeConstants.BALANCE_EVENT_SERDE);

JoinWindows joinWindows = JoinWindows
 .of(Duration.ofSeconds(streamsProperties.getJoinWindowDuration()))
 .grace(Duration.ofSeconds(streamsProperties.getJoinGraceDuration()));

ValueJoiner
valueJoiner =
 (transactionEvent, balanceEvent) -> buildMessage(balanceEvent,
transactionEvent);


transactions
 // TODO: change to leftJoin
 .join(beWithTransaction, valueJoiner, joinWindows, joinParams)


It's pretty simple, but for my use case I need to process in some way the
messages that are not joined, so I thought I could use a LEFT JOIN. But
according to my tests and this documentation
https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

I have seen in the end I could end up with both the combined message as the
regular inner join performs and the message with one side as NULL, for
example (A,B) and (A, null)

I thought the JOIN Window could force the output of the left join to just
output if it found a match to just (A,B) not both. Maybe I have a bug in my
Window configuration

Is there a way to force the behavior I need, meaning... using left join and
a JoinWindows output only one message (A,B) or (A, null)

regards
- Miguel





Re: Pause/Restart a Kafka streams app

2021-11-22 Thread Matthias J. Sax
You can only close() the Kafka Streams client and create a new one to 
resume (offsets are committed on close() and thus would be picked up on 
restart).


Closing and restarting would result in rebalancing thought, so to really 
pause/resume you would need to close() all instances.



There is no API to pause()/resume() similar to what the KafkaConsumer 
offers.



-Matthias


On 11/22/21 2:10 PM, Miguel González wrote:

Hello there

Is it possible to pause/restart a Kafka streams app? I have only found this
discussion
https://groups.google.com/g/confluent-platform/c/Nyj3eN-3ZlQ/m/lMH-bFx-AAAJ
about using map to call an external service and loop until some condition
completes

regards
- Miguel



Re: Stream to KTable internals

2021-11-19 Thread Matthias J. Sax
We want to make further improvement to stream-table joins. It's just not 
easy and a larger scoped project.


-Matthias

On 11/18/21 12:09 PM, Chad Preisler wrote:

I'm wondering if the kafka architects have plans to redesign/enhance this
behavior. Having to guess the idle time isn't the most satisfying solution.
No matter what time I put in there it seems possible that I will miss a
join.

Respectfully,
Chad

On Fri, Nov 5, 2021 at 3:07 PM Matthias J. Sax  wrote:


The log clearly indicates that you hit enforced processing. We record
the metric and log:

Cf

https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L194-L200

Not sure why the metric does not report it...

Hence, the solution would be to increase `max.task.idle.ms` further to
give Kafka Streams time to fetch the data.

If might help to use DEBUG log to see for which partitions the consumer
sends fetch requests and which partitions return data, to better
understand the underlying behavior.


-Matthias

On 11/5/21 6:58 AM, Chad Preisler wrote:

It seems like I have 2 options to work around this issue.


 - Keep the KTable and have another process running that puts the

missed

 join message back on the event topic.
 - Switch to GlobalKTable.

Any other solutions/workarounds are welcome.

Thanks,
Chad

On Thu, Nov 4, 2021 at 11:43 AM Chad Preisler 
wrote:


enforced-processing-total is zero for all missed join occurrences. I
logged all the metrics out at the time my stream processed the missed

join,

so let me know if there are any other metics that would help.

On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler 
wrote:


I'm not sure. When I ran with trace logging turned on I saw a bunch of
messages like the ones below. Do those messages indicate
"enforced-processing"? It gets logged right after the call
to enforcedProcessingSensor.record.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result.

Partitions

with local data: [status-5]. Partitions we gave up waiting for, with

their

corresponding deadlines: {event-5=1635881287722}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result.

Partitions

with local data: [event-5]. Partitions we gave up waiting for, with

their

corresponding deadlines: {status-5=1635881272754}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.

On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax 

wrote:



Can you check if the program ever does "enforced processing", ie,
`max.task.idle.ms` passed, and we process despite an empty input

buffer.


Cf

https://kafka.apache.org/documentation/#kafka_streams_task_monitoring


As long as there is input data, we should never do "enforced

processing"

and the metric should stay at zero.


-Matthias

On 11/3/21 2:41 PM, Chad Preisler wrote:

Just a quick update. Setting max.task.idle.ms to 1 (10 seconds)

had no

effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler <

chad.preis...@gmail.com>

wrote:


No unfortunately it is not the case. The table record is written

about 20

seconds before the stream record. I’ll crank up the time tomorrow

and

see

what happens.

On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax 

wrote:



Hard to tell, but as it seems that you can reproduce the issue, it

might

be worth a try to increase the idle time further.

I guess one corner case for stream-table join that is not resolved

yet

is when stream and table record have the same timestamp... For this
case, the table record might not be processed first.

Could you hit this case?


-Matthias

On 11/2/21 3:13 PM, Chad Preisler wrote:

Thank you for the information. We are using the Kafka 3.0 client

library.

We are able to reliably reproduce this issue in our test

environment

now. I

removed my timestamp extractor, and I set the max.task.idle.ms to

2000. I

also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data

to

create a lag of 400 messages. We saw 5 missed joins.

From the stream-thread log messages we saw the event message,

our

stream

missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread

printed

out

our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join

and

the

stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax 


wrote:



Timestamp synchronization is not perfect, and as a matter of

fact,

we

fixed a few gaps in 3.0.0 release. We actually hope, that we

closed

Re: Endless loop restoring changelog topic

2021-11-16 Thread Matthias J. Sax

Not sure.

Can you enable DEBUG logging on 
`org.apache.kafka.streams.processor.internals.StoreChangelogReader` to 
see if restore does make any progress?



-Matthias

On 7/20/21 5:41 AM, Alessandro Tagliapietra wrote:
I've tried to restart the streams application using at_least_once 
processing guarantee and it worked, restarted again in 
exactly_once_beta and it worked too.


Spoke too soon, after a while in exactly_once_beta the "Restoration in 
progress for 1 partition" loop  started again.




Is there any reason why?

On 7/20/21 2:09 PM, Alessandro Tagliapietra wrote:

Hello everyone,

after upgrading to kafka streams 2.8 we have one streams app that's 
stuck trying to restore a store changelog topic, this is the debug 
log of the app:


https://gist.github.com/alex88/f31593aaabbd282b21f89a0252a28745

I would like to avoid having to delete and recreate the topic, what 
we should do?


I've tried to switch back to exaclty_once (from exactly_once_beta) 
but nothing changed.


Thank you in advance



Re: Please add me to JIRA contributor list

2021-11-09 Thread Matthias J. Sax

Done.

On 11/9/21 5:06 PM, Liam Clarke-Hutchinson wrote:

Hi,

My JIRA username is lclarkenz.

Many thanks,

Liam Clarke-Hutchinson



Re: Stream to KTable internals

2021-11-05 Thread Matthias J. Sax
The log clearly indicates that you hit enforced processing. We record 
the metric and log:


Cf 
https://github.com/apache/kafka/blob/3.0.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L194-L200


Not sure why the metric does not report it...

Hence, the solution would be to increase `max.task.idle.ms` further to 
give Kafka Streams time to fetch the data.


If might help to use DEBUG log to see for which partitions the consumer 
sends fetch requests and which partitions return data, to better 
understand the underlying behavior.



-Matthias

On 11/5/21 6:58 AM, Chad Preisler wrote:

It seems like I have 2 options to work around this issue.


- Keep the KTable and have another process running that puts the missed
join message back on the event topic.
- Switch to GlobalKTable.

Any other solutions/workarounds are welcome.

Thanks,
Chad

On Thu, Nov 4, 2021 at 11:43 AM Chad Preisler 
wrote:


enforced-processing-total is zero for all missed join occurrences. I
logged all the metrics out at the time my stream processed the missed join,
so let me know if there are any other metics that would help.

On Wed, Nov 3, 2021 at 9:21 PM Chad Preisler 
wrote:


I'm not sure. When I ran with trace logging turned on I saw a bunch of
messages like the ones below. Do those messages indicate
"enforced-processing"? It gets logged right after the call
to enforcedProcessingSensor.record.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result. Partitions
with local data: [status-5]. Partitions we gave up waiting for, with their
corresponding deadlines: {event-5=1635881287722}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881287750.

Continuing to process although some partitions are empty on the broker.
There may be out-of-order processing for this task as a result. Partitions
with local data: [event-5]. Partitions we gave up waiting for, with their
corresponding deadlines: {status-5=1635881272754}. Configured
max.task.idle.ms: 2000. Current wall-clock time: 1635881277998.

On Wed, Nov 3, 2021 at 6:11 PM Matthias J. Sax  wrote:


Can you check if the program ever does "enforced processing", ie,
`max.task.idle.ms` passed, and we process despite an empty input buffer.

Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

As long as there is input data, we should never do "enforced processing"
and the metric should stay at zero.


-Matthias

On 11/3/21 2:41 PM, Chad Preisler wrote:

Just a quick update. Setting max.task.idle.ms to 1 (10 seconds)

had no

effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler 
wrote:


No unfortunately it is not the case. The table record is written

about 20

seconds before the stream record. I’ll crank up the time tomorrow and

see

what happens.

On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax 

wrote:



Hard to tell, but as it seems that you can reproduce the issue, it

might

be worth a try to increase the idle time further.

I guess one corner case for stream-table join that is not resolved

yet

is when stream and table record have the same timestamp... For this
case, the table record might not be processed first.

Could you hit this case?


-Matthias

On 11/2/21 3:13 PM, Chad Preisler wrote:

Thank you for the information. We are using the Kafka 3.0 client

library.

We are able to reliably reproduce this issue in our test environment

now. I

removed my timestamp extractor, and I set the max.task.idle.ms to

2000. I

also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data

to

create a lag of 400 messages. We saw 5 missed joins.

   From the stream-thread log messages we saw the event message, our

stream

missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread

printed

out

our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join

and

the

stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax 

wrote:



Timestamp synchronization is not perfect, and as a matter of fact,

we

fixed a few gaps in 3.0.0 release. We actually hope, that we

closed the

last gaps in 3.0.0... *fingers-crossed* :)


We are using a timestamp extractor that returns 0.


You can do this, and it effectively "disables" timestamp

synchronization

as records on the KTable side don't have a timeline any longer. As

a

side effect it also allows you to "bootstrap" the table, as records

with

timestamp zero will always be processed first (as they are

smaller). Of

course, you also don't have time synchronization for "future" data

and

your program becom

Re: Stream to KTable internals

2021-11-03 Thread Matthias J. Sax
Can you check if the program ever does "enforced processing", ie, 
`max.task.idle.ms` passed, and we process despite an empty input buffer.


Cf https://kafka.apache.org/documentation/#kafka_streams_task_monitoring

As long as there is input data, we should never do "enforced processing" 
and the metric should stay at zero.



-Matthias

On 11/3/21 2:41 PM, Chad Preisler wrote:

Just a quick update. Setting max.task.idle.ms to 1 (10 seconds) had no
effect on this issue.

On Tue, Nov 2, 2021 at 6:55 PM Chad Preisler 
wrote:


No unfortunately it is not the case. The table record is written about 20
seconds before the stream record. I’ll crank up the time tomorrow and see
what happens.

On Tue, Nov 2, 2021 at 6:24 PM Matthias J. Sax  wrote:


Hard to tell, but as it seems that you can reproduce the issue, it might
be worth a try to increase the idle time further.

I guess one corner case for stream-table join that is not resolved yet
is when stream and table record have the same timestamp... For this
case, the table record might not be processed first.

Could you hit this case?


-Matthias

On 11/2/21 3:13 PM, Chad Preisler wrote:

Thank you for the information. We are using the Kafka 3.0 client

library.

We are able to reliably reproduce this issue in our test environment

now. I

removed my timestamp extractor, and I set the max.task.idle.ms to

2000. I

also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data to
create a lag of 400 messages. We saw 5 missed joins.

  From the stream-thread log messages we saw the event message, our

stream

missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread printed

out

our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join and

the

stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax 

wrote:



Timestamp synchronization is not perfect, and as a matter of fact, we
fixed a few gaps in 3.0.0 release. We actually hope, that we closed the
last gaps in 3.0.0... *fingers-crossed* :)


We are using a timestamp extractor that returns 0.


You can do this, and it effectively "disables" timestamp

synchronization

as records on the KTable side don't have a timeline any longer. As a
side effect it also allows you to "bootstrap" the table, as records

with

timestamp zero will always be processed first (as they are smaller). Of
course, you also don't have time synchronization for "future" data and
your program becomes non-deterministic if you reprocess old data.


his seemed to be the only
way to bootstrap enough records at startup to avoid the missed join.


Using 3.0.0 and enabling timestamp synchronization via
`max.task.idle.ms` config, should allow you to get the correct

behavior

without the zero-extractor (of course, your KTable data must have
smaller timestamps that your KStream data).


If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on

startup?


To be more precise: timestamp synchronization is _always_ on. The
question is just how strict it is applied. By default, we do the

weakest

from which is only best effort.


I'm guessing the issue here is that occasionally the poll request is

not

returning the matching record for the KTable side of the join before

the

task goes off and starts processing records.


Yes, because of default best effort approach. That is why you should
increase `max.task.idle.ms` to detect this case and "skip" processing
and let KS do another poll() to get KTable data.

2.8 and earlier:

max.task.idle.ms=0 -> best effort (no poll() retry)
max.task.idle.ms>0 -> try to do another poll() until data is there or
idle time passed

Note: >0 might still "fail" even if there is data, because consumer
fetch behavior is not predictable.


3.0:

max.task.idle.ms=-1 -> best effort (no poll() retry)
max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
until you get the data
max.task.idle.ms>0 -> even if there is not data broker side, wait

until

data becomes available or the idle time passed


Hope this helps.


-Matthias

On 11/1/21 4:29 PM, Guozhang Wang wrote:

Hello Chad,

   From your earlier comment, you mentioned "In my scenario the records

were

written to the KTable topic before the record was written to the

KStream

topic." So I think Matthias and others have excluded this possibility

while

trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then 

Re: Kafka streams event deduplication keeping last event in window

2021-11-03 Thread Matthias J. Sax
You could do something similar to what the WindowStore does and store a 
key-timestamp pair as actual key. Given current wall-clock time, you can 
compute the time for closed windows and do corresponding lookups (either 
per key, or using range scans).


-Matthias

On 11/3/21 12:40 AM, Luigi Cerone wrote:

Hello Matthias, thanks for your reply.


Using a plain kv-store, whenever the punctuation runs you can find closed

windows, forward the result and also delete the row explicitly, which give
you more control.


What is the best way to find closed windows? Have you got any examples?

Thanks! :)

On 2021/11/02 23:34:33 "Matthias J. Sax" wrote:

I did not study your code snippet, but yes, it sounds like a valid
approach from your description.


How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?


For punctuations, there is always some jitter, because it's not possible
to run a punctuation at the very exact point in time when it is
scheduled to run. Thus, a punctuation might fire a little delayed
anyway. You can also not control the "anchor point" directly, because it
depends on the point in time when you register the punctuation.

Also note, that a WindowedStore is basically still a key-value store, ie
a single key-value pair models one window. The main difference is the
timestamp that is use to expired rows eventually, what just implies that
expired rows are dropped (without any notification).

Thus, the only thing you can do is, to run the punctuation frequently
enough to keep latency low enough to detect windows that are logically
closed to forward the corresponding result.

But you cannot really "bind" the punctuation with the state store
expiration, and window-store also does not support deletes... Thus, I am
wondering if using a plain key-value store might be more useful for your
case? Using a plain kv-store, whenever the punctuation runs you can find
closed windows, forward the result and also delete the row explicitly,
which give you more control.

Hope this helps.

-Matthias

On 11/2/21 10:29 AM, Luigi Cerone wrote:

I'm using Kafka Streams in a deduplication events problem over short

time

windows (<= 1 minute).
First I've tried to tackle the problem by using DSL API with
[`.suppress(Suppressed.untilWindowCloses(...))`](


https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html
)

operator but, given the fact that wall-clock time is not yet supported
(I've seen the [KIP 424](


https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)
),

this operator is not viable for my use case.

Then, I've followed this [official Confluent example](


https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)

in which low level Processor API is used and it was working fine but has
one major limitation for my use-case. The single event (obtained by
deduplication) is emitted at the **beginning** of the time window,
subsequent duplicated events are "suppressed". In my use case I need the
reverse of that, meaning that a single event should be emitted at the

end

of the window.
I'm asking for suggestions on how to implement this use case with

Processor

API.

My idea was to use the Processor API with a custom [Transformer](


https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html
)

and a [Punctuator](


https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html

).
The transformer would store in a [WindowStore](


https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html
)

the distinct keys received without returning any KeyValue.

Simultaneously,

I'd schedule a punctuator running with an interval equal to the size of

the

window in the WindowStore. This punctuator will iterate over the

elements

in the store and forward them downstream.
The following are some core parts of the logic:

DeduplicationTransformer (slightly modified from [official Confluent
example](


https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)

):
```java
  @Override
  @SuppressWarnings("unchecked")
  public void init(final ProcessorContext context) {
  this.context = context;
  eventIdStore = (WindowStore)
context.getStateStore(this.storeName);

  // Schedule punctuator for this transformer.
  context.schedule(Duration.ofMillis(this.windowSizeMs),
PunctuationType.WALL_CLOCK_TIME,
  new DeduplicationPunctuator(eventIdStore, context,
this.windowSizeMs));
  }

  @Override
  public KeyValue transform(final K key, final V value) {
  final E eventId = idExtractor.apply(key, value);
  if (eventId == null) {
  return KeyValue.pair(key, value);
  } else {
  if (!isDuplicate(eventId)

Re: Kafka streams event deduplication keeping last event in window

2021-11-02 Thread Matthias J. Sax
I did not study your code snippet, but yes, it sounds like a valid 
approach from your description.



How can I be sure that the start of the window will
coincide with the Punctuator's scheduled interval?


For punctuations, there is always some jitter, because it's not possible 
to run a punctuation at the very exact point in time when it is 
scheduled to run. Thus, a punctuation might fire a little delayed 
anyway. You can also not control the "anchor point" directly, because it 
depends on the point in time when you register the punctuation.


Also note, that a WindowedStore is basically still a key-value store, ie 
a single key-value pair models one window. The main difference is the 
timestamp that is use to expired rows eventually, what just implies that 
expired rows are dropped (without any notification).


Thus, the only thing you can do is, to run the punctuation frequently 
enough to keep latency low enough to detect windows that are logically 
closed to forward the corresponding result.


But you cannot really "bind" the punctuation with the state store 
expiration, and window-store also does not support deletes... Thus, I am 
wondering if using a plain key-value store might be more useful for your 
case? Using a plain kv-store, whenever the punctuation runs you can find 
closed windows, forward the result and also delete the row explicitly, 
which give you more control.


Hope this helps.

-Matthias

On 11/2/21 10:29 AM, Luigi Cerone wrote:

I'm using Kafka Streams in a deduplication events problem over short time
windows (<= 1 minute).
First I've tried to tackle the problem by using DSL API with
[`.suppress(Suppressed.untilWindowCloses(...))`](
https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html)
operator but, given the fact that wall-clock time is not yet supported
(I've seen the [KIP 424](
https://cwiki.apache.org/confluence/display/KAFKA/KIP-424%3A+Allow+suppression+of+intermediate+events+based+on+wall+clock+time)),
this operator is not viable for my use case.

Then, I've followed this [official Confluent example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
in which low level Processor API is used and it was working fine but has
one major limitation for my use-case. The single event (obtained by
deduplication) is emitted at the **beginning** of the time window,
subsequent duplicated events are "suppressed". In my use case I need the
reverse of that, meaning that a single event should be emitted at the end
of the window.
I'm asking for suggestions on how to implement this use case with Processor
API.

My idea was to use the Processor API with a custom [Transformer](
https://kafka.apache.org/20/javadoc/org/apache/kafka/streams/kstream/Transformer.html)
and a [Punctuator](
https://kafka.apache.org/24/javadoc/index.html?org/apache/kafka/streams/processor/Punctuator.html
).
The transformer would store in a [WindowStore](
https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/state/WindowStore.html)
the distinct keys received without returning any KeyValue. Simultaneously,
I'd schedule a punctuator running with an interval equal to the size of the
window in the WindowStore. This punctuator will iterate over the elements
in the store and forward them downstream.
The following are some core parts of the logic:

DeduplicationTransformer (slightly modified from [official Confluent
example](
https://kafka-tutorials.confluent.io/finding-distinct-events/confluent.html)
):
```java
 @Override
 @SuppressWarnings("unchecked")
 public void init(final ProcessorContext context) {
 this.context = context;
 eventIdStore = (WindowStore)
context.getStateStore(this.storeName);

 // Schedule punctuator for this transformer.
 context.schedule(Duration.ofMillis(this.windowSizeMs),
PunctuationType.WALL_CLOCK_TIME,
 new DeduplicationPunctuator(eventIdStore, context,
this.windowSizeMs));
 }

 @Override
 public KeyValue transform(final K key, final V value) {
 final E eventId = idExtractor.apply(key, value);
 if (eventId == null) {
 return KeyValue.pair(key, value);
 } else {
 if (!isDuplicate(eventId)) {
 rememberNewEvent(eventId, value, context.timestamp());
 }
 return null;
 }
 }
```

DeduplicationPunctuator:
```java
 public DeduplicationPunctuator(WindowStore eventIdStore,
ProcessorContext context,
 long retainPeriodMs) {
 this.eventIdStore = eventIdStore;
 this.context = context;
 this.retainPeriodMs = retainPeriodMs;
 }

 @Override
 public void punctuate(long invocationTime) {
 LOGGER.info("Punctuator invoked at {}, searching from {}", new
Date(invocationTime), new Date(invocationTime-retainPeriodMs));

 KeyValueIterator, V> it =
 eventIdStore.fetchAll(invocationTime - retainPeriodMs,
invocationTime + 

Re: Stream to KTable internals

2021-11-02 Thread Matthias J. Sax
Hard to tell, but as it seems that you can reproduce the issue, it might 
be worth a try to increase the idle time further.


I guess one corner case for stream-table join that is not resolved yet 
is when stream and table record have the same timestamp... For this 
case, the table record might not be processed first.


Could you hit this case?


-Matthias

On 11/2/21 3:13 PM, Chad Preisler wrote:

Thank you for the information. We are using the Kafka 3.0 client library.
We are able to reliably reproduce this issue in our test environment now. I
removed my timestamp extractor, and I set the max.task.idle.ms to 2000. I
also turned on trace logging for package
org.apache.kafka.streams.processor.internals.

To create the issue we stopped the application and ran enough data to
create a lag of 400 messages. We saw 5 missed joins.

 From the stream-thread log messages we saw the event message, our stream
missed the join, and then several milliseconds later we saw the
stream-thread print out the status message. The stream-thread printed out
our status message a total of 5 times.

Given that only a few milliseconds passed between missing the join and the
stream-thread printing the status message, would increasing the
max.task.idle.ms help?

Thanks,
Chad

On Mon, Nov 1, 2021 at 10:03 PM Matthias J. Sax  wrote:


Timestamp synchronization is not perfect, and as a matter of fact, we
fixed a few gaps in 3.0.0 release. We actually hope, that we closed the
last gaps in 3.0.0... *fingers-crossed* :)


We are using a timestamp extractor that returns 0.


You can do this, and it effectively "disables" timestamp synchronization
as records on the KTable side don't have a timeline any longer. As a
side effect it also allows you to "bootstrap" the table, as records with
timestamp zero will always be processed first (as they are smaller). Of
course, you also don't have time synchronization for "future" data and
your program becomes non-deterministic if you reprocess old data.


his seemed to be the only
way to bootstrap enough records at startup to avoid the missed join.


Using 3.0.0 and enabling timestamp synchronization via
`max.task.idle.ms` config, should allow you to get the correct behavior
without the zero-extractor (of course, your KTable data must have
smaller timestamps that your KStream data).


If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on startup?


To be more precise: timestamp synchronization is _always_ on. The
question is just how strict it is applied. By default, we do the weakest
from which is only best effort.


I'm guessing the issue here is that occasionally the poll request is not
returning the matching record for the KTable side of the join before the
task goes off and starts processing records.


Yes, because of default best effort approach. That is why you should
increase `max.task.idle.ms` to detect this case and "skip" processing
and let KS do another poll() to get KTable data.

2.8 and earlier:

max.task.idle.ms=0 -> best effort (no poll() retry)
max.task.idle.ms>0 -> try to do another poll() until data is there or
idle time passed

Note: >0 might still "fail" even if there is data, because consumer
fetch behavior is not predictable.


3.0:

max.task.idle.ms=-1 -> best effort (no poll() retry)
max.task.idle.ms=0 -> if there is data broker side, repeat to poll()
until you get the data
max.task.idle.ms>0 -> even if there is not data broker side, wait until
data becomes available or the idle time passed


Hope this helps.


-Matthias

On 11/1/21 4:29 PM, Guozhang Wang wrote:

Hello Chad,

  From your earlier comment, you mentioned "In my scenario the records

were

written to the KTable topic before the record was written to the KStream
topic." So I think Matthias and others have excluded this possibility

while

trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler 
wrote:


Thank you for your response and the links to the presentations.


*However, this seems tobe orthogonal to your issue?*

Yes. From what I see in the code it looks like you have a single

consumer

subscribed to multiple topics. Please correct me if I'm wrong.


*By default, timestamp synchronization is disabled. Maybeenabling it

would

help?*

We are using a timestamp extractor that returns 0. We did that because

we

were almost always missing joins on startup, and this seemed to be the

only

way to bootstrap enough records at startup to avoid the missed join. We
found a post that said doing that would make the KTable act like the
GlobalKTable at startup. So far this works great,

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Matthias J. Sax
The `Producer#send()` call is actually not covered by the KIP because it 
may result in data loss if we try to handle the timeout directly. -- 
Kafka Streams does not have a copy of the data in the producer's send 
buffer and thus we cannot retry the `send()`. -- Instead, it's necessary 
to re-process the input data which is not done automatically.



-Matthias

On 11/1/21 4:34 PM, Guozhang Wang wrote:

Hello Pushkar,

I'm assuming you have the same Kafka version (2.5.1) at the Streams client
side here: in those old versions, Kafka Streams relies on the embedded
Producer clients to handle timeouts, which requires users to correctly
configure such values.

In newer version (2.8+) We have made Kafka Streams more robust to Server
side disconnects or soft failures that may cause timeouts:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams.
So I'd suggest you upgrade to those versions, and see if those symptoms go
away.


Guozhang

On Sun, Oct 31, 2021 at 5:59 AM Pushkar Deole  wrote:


Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?


{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -

analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this

timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat

org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat

datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat

org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat

org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:197)\n\tat

org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:676)\n\tat

org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:380)\n\tat

org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:323)\n\tat

org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)\n\tat
java.base/java.lang.Thread.run(Unknown Source)\nCaused by:

Re: Producer Timeout issue in kafka streams task

2021-11-01 Thread Matthias J. Sax
As the error message suggests, you can increase `max.block.ms` for this 
case: If a broker is down, it may take some time for the producer to 
fail over to a different broker (before the producer can fail over, the 
broker must elect a new partition leader, and only afterward can inform 
the producer about the new broker that must be used to write into the 
partition). Increasing `max.block.ms` gives the producer (and thus the 
brokers) more time to do the fail-over.


If the fail-over fails (ie, times out), the producer raises the 
exception and KS is forced to stop processing, to avoid data loss 
(because the producer did buffer some data that would be lost due to the 
error if we commit offsets).


In general, Kafka Streams tries to handle as many broker/client errors 
as possible (and newer version handle more cases than older version). 
But there are always some case that cannot be handled by Kafka Streams. 
Of course, changing client config (producer/consumer and Kafka Streams) 
can help to make it more robust.


Thus, it comes down to monitoring of Kafka Streams:

For Kafka Streams, using newer versions you can actually register a 
uncaught exception handler that allows you to restart failed threads. In 
older versions of Kafka Streams, you can also register a callback, but 
it only informs you that a thread died. In older versions you would need 
to `close()` KafkaStreams and create a new instance and `start()` it to 
recover a died thread.



Hope this helps,

-Matthias


On 10/31/21 5:52 AM, Pushkar Deole wrote:

Hi All,

I am getting below issue in streams application. Kafka cluster is a 3
broker cluster (v 2.5.1) and I could see that 2 of the 3 brokers restarted
at the same time when below exception occurred in streams application so I
can relate below exception to those brokers restarts. However, what is
worrying me is the streams application did not process any events after
below exception. So the question is:
1. how can i make the streams application resilient to broker issues e.g.
the producer underneath streams should have connected to another broker
instance at the time 1 broker went down, but possible the 2nd broker went
down immediately that's why it timed out
2. In general how does streams handle broker issue and when does it decide
to connect to another broker instance in case one instance seems to be in
error?

{"@timestamp":"2021-10-30T12:19:43.486+00:00","@version":"1","message":"Exception
processing processor thread -
analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2
stream - task [0_5] Abort sending since an error caught with a previous
record (timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this
timeout.","logger_name":"com.avaya.analytics.kafka.topology.EventNormalizerTopology","thread_name":"analytics-event-normalizer-00042391-f084-441f-95da-beb2d0242943-StreamThread-2","level":"ERROR","level_value":4,"stack_trace":"org.apache.kafka.streams.errors.StreamsException:
task [0_5] Abort sending since an error caught with a previous record
(timestamp 1635596258179) to topic analytics-incoming-feed due to
org.apache.kafka.common.errors.TimeoutException: Expiring 12 record(s) for
analytics-incoming-feed-4:12 ms has passed since batch
creation\nTimeout exception caught when sending record to topic
analytics-incoming-feed. This might happen if the producer cannot send data
to the Kafka cluster and thus, its internal buffer fills up. This can also
happen if the broker is slow to respond, if the network connection to the
broker was interrupted, or if similar circumstances arise. You can increase
producer parameter `max.block.ms` to increase this timeout.\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:154)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)\n\tat
org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:214)\n\tat
datadog.trace.instrumentation.kafka_clients.KafkaProducerInstrumentation$ProducerCallback.onCompletion(KafkaProducerInstrumentation.java:142)\n\tat
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1356)\n\tat
org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:231)\n\tat

Re: Stream to KTable internals

2021-11-01 Thread Matthias J. Sax
Timestamp synchronization is not perfect, and as a matter of fact, we 
fixed a few gaps in 3.0.0 release. We actually hope, that we closed the 
last gaps in 3.0.0... *fingers-crossed* :)


We are using a timestamp extractor that returns 0. 


You can do this, and it effectively "disables" timestamp synchronization 
as records on the KTable side don't have a timeline any longer. As a 
side effect it also allows you to "bootstrap" the table, as records with 
timestamp zero will always be processed first (as they are smaller). Of 
course, you also don't have time synchronization for "future" data and 
your program becomes non-deterministic if you reprocess old data.



his seemed to be the only
way to bootstrap enough records at startup to avoid the missed join.


Using 3.0.0 and enabling timestamp synchronization via 
`max.task.idle.ms` config, should allow you to get the correct behavior 
without the zero-extractor (of course, your KTable data must have 
smaller timestamps that your KStream data).



If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on startup?


To be more precise: timestamp synchronization is _always_ on. The 
question is just how strict it is applied. By default, we do the weakest 
from which is only best effort.



I'm guessing the issue here is that occasionally the poll request is not
returning the matching record for the KTable side of the join before the
task goes off and starts processing records.


Yes, because of default best effort approach. That is why you should 
increase `max.task.idle.ms` to detect this case and "skip" processing 
and let KS do another poll() to get KTable data.


2.8 and earlier:

max.task.idle.ms=0 -> best effort (no poll() retry)
max.task.idle.ms>0 -> try to do another poll() until data is there or 
idle time passed


Note: >0 might still "fail" even if there is data, because consumer 
fetch behavior is not predictable.



3.0:

max.task.idle.ms=-1 -> best effort (no poll() retry)
max.task.idle.ms=0 -> if there is data broker side, repeat to poll() 
until you get the data
max.task.idle.ms>0 -> even if there is not data broker side, wait until 
data becomes available or the idle time passed



Hope this helps.


-Matthias

On 11/1/21 4:29 PM, Guozhang Wang wrote:

Hello Chad,

 From your earlier comment, you mentioned "In my scenario the records were
written to the KTable topic before the record was written to the KStream
topic." So I think Matthias and others have excluded this possibility while
trying to help investigate.

If only the matching records from KStream are returned via a single a
consumer poll call but not the other records from KTable, then you would
miss this matched join result.

Guozhang


On Sun, Oct 31, 2021 at 7:28 AM Chad Preisler 
wrote:


Thank you for your response and the links to the presentations.


*However, this seems tobe orthogonal to your issue?*

Yes. From what I see in the code it looks like you have a single consumer
subscribed to multiple topics. Please correct me if I'm wrong.


*By default, timestamp synchronization is disabled. Maybeenabling it would
help?*

We are using a timestamp extractor that returns 0. We did that because we
were almost always missing joins on startup, and this seemed to be the only
way to bootstrap enough records at startup to avoid the missed join. We
found a post that said doing that would make the KTable act like the
GlobalKTable at startup. So far this works great, we never miss a join on a
startup. If I use "timestamp synchronization" do I have to remove the zero
timestamp extractor? If I remove the zero timestamp extractor will
timestamp synchronization take care of the missed join issue on startup?

I'm guessing the issue here is that occasionally the poll request is not
returning the matching record for the KTable side of the join before the
task goes off and starts processing records. Later when we put the same
record on the topic and the KTable has had a chance to load more records
the join works and everything is good to go. Because of the way our system
works no new status records have been written and so the new record joins
against the correct status.

Do you agree that the poll request is returning the KStream record but not
returning the KTable record and therefore the join is getting missed? If
you don't agree, what do you think is going on? Is there a way to prove
this out?

Thanks,
Chad

On Sat, Oct 30, 2021 at 2:09 PM Matthias J. Sax  wrote:


Yes, a StreamThread has one consumer. The number of StreamThreads per
instance is configurable via `num.stream.threads`. Partitions are
assigned to threads similar to consumer is a plain consumer group.

It seems you run with the default of one thread per instance. As you
spin up 12 inst

Re: Stream to KTable internals

2021-10-30 Thread Matthias J. Sax
Yes, a StreamThread has one consumer. The number of StreamThreads per 
instance is configurable via `num.stream.threads`. Partitions are 
assigned to threads similar to consumer is a plain consumer group.


It seems you run with the default of one thread per instance. As you 
spin up 12 instances, it results in 12 threads for the application. As 
you have 12 partitions, using more threads won't be useful as no 
partitions are left for them to process.


For a stream-table joins, there will be one task per "partition pair" 
that computes the join for those partitions. So you get 12 tasks, and 
each thread processes one task in your setup. Ie, a thread consumer is 
reading data for both input topics.


Pausing happens on a per-partition bases: for joins there is two buffers 
per task (one for each input topic partition). It's possible that one 
partition is paused while the other is processed. However, this seems to 
be orthogonal to your issue?


For a GlobalKTable, you get an additional GlobalThread that only reads 
the data from the "global topic" to update the GlobalKTable. Semantics 
of KStream-KTable and KStream-GlobalKTable joins are different: Cf 
https://www.confluent.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/


For the timestamp synchronization, you may checkout `max.task.idle.ms` 
config. By default, timestamp synchronization is disabled. Maybe 
enabling it would help?


You may also check out slides 34-38: 
https://www.confluent.io/kafka-summit-san-francisco-2019/whats-the-time-and-why/


There is one corner case: if two records with the same timestamp come 
it, it's not defined which one will be processed first.


Hope this helps.


-Matthias


On 10/30/21 6:45 AM, Chad Preisler wrote:

Yes, this helped. I have some additional questions.

Does StreamThread have one consumer? (Looks like it, but just want to
confirm)
Is there a separate StreamThread for each topic including the KTable?
If a KTable is a StreamThread and there is a  StreamTask for that KTable,
could my buffer be getting filled up, and the mainConsumer for the KTable
be getting paused? I see this code in StreamTask#addRecords.

// if after adding these records, its partition queue's buffered size has
been
 // increased beyond the threshold, we can then pause the
consumption for this partition
 if (newQueueSize > maxBufferedSize) {
 mainConsumer.pause(singleton(partition));
 }

Is there any specific logging that I can set to debug or trace that would
help me troubleshoot? I'd prefer not to turn debug and/or trace on for
every single class.

Thanks,
Chad





On Sat, Oct 30, 2021 at 5:20 AM Luke Chen  wrote:


Hi Chad,

I'm wondering if someone can point me to the Kafka streams internal code

that reads records for the join?
--> You can check StreamThread#pollPhase, where stream thread (main
consumer) periodically poll records. And then, it'll process each topology
node with these polled records in stream tasks (StreamTask#process).

Hope that helps.

Thanks.
Luke


On Sat, Oct 30, 2021 at 5:42 PM Gilles Philippart
 wrote:


Hi Chad, this talk around 24:00 clearly explains what you’re seeing


https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/

<


https://www.confluent.io/events/kafka-summit-europe-2021/failing-to-cross-the-streams-lessons-learned-the-hard-way/




Gilles


On 30 Oct 2021, at 04:02, Chad Preisler 

wrote:


Hello,

I have a stream application that does a KStream to KTable left join. We
seem to be occasionally missing joins (KTable side is null).

I'm wondering if someone can point me to the Kafka streams internal

code

that reads records for the join? I've poked around the Kafka code base,

but

there is a lot there. I imagine there is some consumer poll for each

side

of the join, and possibly a background thread for reading the KTable

topic.


I figure there are several possible causes of this issue, and since

nothing

is really jumping out in my code, I was going to start looking at the

Kafka

code to see if there is something I can do to fix this.

Thanks,
Chad



--




Funding Circle Limited is authorised and regulated by the Financial
Conduct Authority under firm registration number 722513. Funding Circle

is

not covered by the Financial Services Compensation Scheme. Registered in
England (Co. No. 06968588) with registered office at 71 Queen Victoria
Street, London EC4V 4AY.







  1   2   3   4   5   6   7   8   9   10   >