Re: RecordCollectorImpl: task [1_1] Error sending records

2019-06-04 Thread Bruno Cadonna
Hi Mohan,

Could you post the log messages you see and you think you should not see?

It is hard to help you without any actual logs.

Best,
Bruno

On Wed, Jun 5, 2019 at 6:52 AM Parthasarathy, Mohan  wrote:
>
> Hi,
>
> As mentioned here
>
> https://issues.apache.org/jira/browse/KAFKA-7510
>
> I do see these logs turned on by default. We are running 2.2 and I still see 
> the error. It looks like the fix is in 2.0. What am I missing ? It seems to 
> be logged in “WARN” level instead of “debug” level.  It spews a huge amount 
> of binary data. How can I turn this off ?
>
> Thanks
> Mohan
>


Re: Error building

2019-06-05 Thread Bruno Cadonna
Hi Landon,

I tried your command on apache/kafka:trunk with HEAD at commit
ba3dc494371145e8ad35d6b85f45b8fe1e44c21f and it worked.

./gradlew -v

Gradle 5.1.1


Build time:   2019-01-10 23:05:02 UTC
Revision: 3c9abb645fb83932c44e8610642393ad62116807

Kotlin DSL:   1.1.1
Kotlin:   1.3.11
Groovy:   2.5.4
Ant:  Apache Ant(TM) version 1.9.13 compiled on July 10 2018
JVM:  1.8.0_202 (Oracle Corporation 25.202-b08)
OS:   Mac OS X 10.14.4 x86_64

I would try the following and retry to build after each item:
- ./gradlew clean
- git pull (maybe also a ./gradlew clean)
- re-initialise gradlew
- make a fresh git clone and start from the beginning

Also, read again the README.md at https://github.com/apache/kafka.
Maybe you missed something there.

Maybe somebody else here on the mailing list has also a good tip for you.

Best,
Bruno

On Wed, Jun 5, 2019 at 2:56 AM Landon Kuhn  wrote:
>
> Hello,
>
> I am unable to build the project and am looking for help. "./gradlew
> --console=plain installAll" results in the following output:
>
> > Configure project :
> Building project 'core' with Scala version 2.12.8
> Building project 'streams-scala' with Scala version 2.12.8
>
> > Task :generator:compileJava UP-TO-DATE
> > Task :generator:processResources NO-SOURCE
> > Task :generator:clas./gradlew --console=plain installAllses UP-TO-DATE
> > Task :clients:proces./gradlew --console=plain installAllsMessages
> UP-TO-DATE
>
> > Task :clients:compileJava FAILED
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java:23:
> warning: [deprecation] PrincipalBuilder in
> org.apache.kafka.common.security.auth has been deprecated
> import org.apache.kafka.common.security.auth.PrincipalBuilder;
> ^
> error: warnings found and -Werror specified
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java:104:
> error: method configEntryOption in class JaasContext cannot be applied to
> given types;
> String expectedPassword =
> jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
>  ^
>   required: List,String,String
>   found: String,String
>   reason: actual and formal argument lists differ in length
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java:173:
> error: cannot find symbol
> return new PlainSaslServer(((SaslServerCallbackHandler)
> cbh).jaasContext());
> ^
>   symbol:   method jaasContext()
>   location: class SaslServerCallbackHandler
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java:68:
> error: constructor AbstractRequest in class AbstractRequest cannot be
> applied to given types;
> super(version);
> ^
>   required: ApiKeys,short
>   found: short
>   reason: actual and formal argument lists differ in length
>
> 
>
> The output continues for dozens of similar errors. Here the output from
> "gradle -v":
>
> > Configure project :
> Building project 'core' with Scala version 2.12.8
> Building project 'streams-scala' with Scala version 2.12.8
>
> > Task :generator:compileJava UP-TO-DATE
> > Task :generator:processResources NO-SOURCE
> > Task :generator:clas./gradlew --console=plain installAllses UP-TO-DATE
> > Task :clients:proces./gradlew --console=plain installAllsMessages
> UP-TO-DATE
>
> > Task :clients:compileJava FAILED
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java:23:
> warning: [deprecation] PrincipalBuilder in
> org.apache.kafka.common.security.auth has been deprecated
> import org.apache.kafka.common.security.auth.PrincipalBuilder;
> ^
> error: warnings found and -Werror specified
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java:104:
> error: method configEntryOption in class JaasContext cannot be applied to
> given types;
> String expectedPassword =
> jaasContext.configEntryOption(JAAS_USER_PREFIX + username,
>  ^
>   required: List,String,String
>   found: String,String
>   reason: actual and formal argument lists differ in length
> /home/landon/src/kafka/kafka/clients/src/main/java/org/apache/kafka/common/security/plain/PlainSaslServer.java:173:
> error: cannot find symbol
> return new PlainSaslServer(((SaslServerCallbackHandler)
> cbh).jaasContext());
> ^
>   symbol:   method jaasContext()
>   location: class SaslServerCallbackHandler
>

Re: RecordCollectorImpl: task [1_1] Error sending records

2019-06-06 Thread Bruno Cadonna
Hi Mohan,

This error means that an exception was thrown during sending a record
from a Streams producer to a broker.

Best,
Bruno

On Thu, Jun 6, 2019 at 7:56 PM Parthasarathy, Mohan  wrote:
>
> After changing the log level for Kafka streams from warn to error, I don’t 
> see this message. These messages were huge blobs of numbers preceded by the 
> subject line of this message. This can be a big pain in production where you 
> will run out of space.
>
> Can someone tell me what this error means ? It seems to be okay but not sure 
> about what we are missing
>
>
> ________
> From: Bruno Cadonna 
> Sent: Tuesday, June 4, 2019 11:53 PM
> To: users@kafka.apache.org
> Subject: Re: RecordCollectorImpl: task [1_1] Error sending records
>
> Hi Mohan,
>
> Could you post the log messages you see and you think you should not see?
>
> It is hard to help you without any actual logs.
>
> Best,
> Bruno
>
> On Wed, Jun 5, 2019 at 6:52 AM Parthasarathy, Mohan  wrote:
> >
> > Hi,
> >
> > As mentioned here
> >
> > https://issues.apache.org/jira/browse/KAFKA-7510
> >
> > I do see these logs turned on by default. We are running 2.2 and I still 
> > see the error. It looks like the fix is in 2.0. What am I missing ? It 
> > seems to be logged in “WARN” level instead of “debug” level. It spews a 
> > huge amount of binary data. How can I turn this off ?
> >
> > Thanks
> > Mohan
> >


Re: Can kafka internal state be purged ?

2019-06-19 Thread Bruno Cadonna
Hi Mohan,

Did you set a grace period on the window?

Best,
Bruno

On Tue, Jun 18, 2019 at 2:04 AM Parthasarathy, Mohan  wrote:
>
> On further debugging, what we are seeing is that windows are expiring rather 
> randomly as new messages are being processed. . We tested with new key for 
> every new message. We waited for the window time before replaying new 
> messages. Sometimes a new message would come in and create state. It takes 
> several messages to make some of the old windows to be closed (go past 
> suppress to the next stage). We have also seen where one of them never closed 
> even but several other older ones expired.  Then we explicitly sent a message 
> with the same old key and then it showed up. Also, for every new message, 
> only one of the previous window expires even though there are several pending.
>
> If we don't use suppress, then there is never an issue. With suppress, the 
> behavior we are seeing is weird. We are using 2.1.0 version in DSL mode. Any 
> clues on what we could be missing ? Why isn't there an order in the way 
> windows are closed ? As event time progresses by the new messages arriving, 
> the older ones should expire. Is that right understanding or not ?
>
> Thanks
> Mohan
>
> On 6/17/19, 3:43 PM, "Parthasarathy, Mohan"  wrote:
>
> Hi,
>
> We are using suppress in the application. We see some state being created 
> at some point in time. Now there is no new data for a day or two. We send new 
> data but the old window of data (where we see the state being created) is not 
> closing i.e not seeing it go through suppress and on to the next stage. It is 
> as though the state created earlier was purged. Is this possible ?
>
> Thanks
> Mohan
>
>
>


Re: Can kafka internal state be purged ?

2019-06-19 Thread Bruno Cadonna
Hi Mohan,

if you do not set a grace period, the grace period defaults to 12
hours. Hence, suppress would wait for an event that occurs 12 hour
later before it outputs a result. Try to explicitly set the grace
period to 0 and let us know if it worked.

If it still does not work, upgrade to version 2.2.1 if it is possible
for you. We had a couple of bugs in suppress recently that are fixed
in that version.

Best,
Bruno

On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy, Mohan  wrote:
>
> No, I have not set any grace period. Is that mandatory ? Have you seen 
> problems with suppress and windows expiring ?
>
> Thanks
> Mohan
>
> On 6/19/19, 12:41 AM, "Bruno Cadonna"  wrote:
>
> Hi Mohan,
>
> Did you set a grace period on the window?
>
> Best,
> Bruno
>
> On Tue, Jun 18, 2019 at 2:04 AM Parthasarathy, Mohan  
> wrote:
> >
> > On further debugging, what we are seeing is that windows are expiring 
> rather randomly as new messages are being processed. . We tested with new key 
> for every new message. We waited for the window time before replaying new 
> messages. Sometimes a new message would come in and create state. It takes 
> several messages to make some of the old windows to be closed (go past 
> suppress to the next stage). We have also seen where one of them never closed 
> even but several other older ones expired.  Then we explicitly sent a message 
> with the same old key and then it showed up. Also, for every new message, 
> only one of the previous window expires even though there are several pending.
> >
> > If we don't use suppress, then there is never an issue. With suppress, 
> the behavior we are seeing is weird. We are using 2.1.0 version in DSL mode. 
> Any clues on what we could be missing ? Why isn't there an order in the way 
> windows are closed ? As event time progresses by the new messages arriving, 
> the older ones should expire. Is that right understanding or not ?
> >
> > Thanks
> > Mohan
> >
> > On 6/17/19, 3:43 PM, "Parthasarathy, Mohan"  wrote:
> >
> > Hi,
> >
> > We are using suppress in the application. We see some state being 
> created at some point in time. Now there is no new data for a day or two. We 
> send new data but the old window of data (where we see the state being 
> created) is not closing i.e not seeing it go through suppress and on to the 
> next stage. It is as though the state created earlier was purged. Is this 
> possible ?
> >
> > Thanks
> > Mohan
> >
> >
> >
>
>


Re: Can kafka internal state be purged ?

2019-06-19 Thread Bruno Cadonna
Hi Mohan,

I realized that my previous statement was not clear. With a grace
period of 12 hour, suppress would wait for late events until stream
time has advanced 12 hours before a result would be emitted.

Best,
Bruno

On Wed, Jun 19, 2019 at 9:21 PM Bruno Cadonna  wrote:
>
> Hi Mohan,
>
> if you do not set a grace period, the grace period defaults to 12
> hours. Hence, suppress would wait for an event that occurs 12 hour
> later before it outputs a result. Try to explicitly set the grace
> period to 0 and let us know if it worked.
>
> If it still does not work, upgrade to version 2.2.1 if it is possible
> for you. We had a couple of bugs in suppress recently that are fixed
> in that version.
>
> Best,
> Bruno
>
> On Wed, Jun 19, 2019 at 8:37 PM Parthasarathy, Mohan  wrote:
> >
> > No, I have not set any grace period. Is that mandatory ? Have you seen 
> > problems with suppress and windows expiring ?
> >
> > Thanks
> > Mohan
> >
> > On 6/19/19, 12:41 AM, "Bruno Cadonna"  wrote:
> >
> > Hi Mohan,
> >
> > Did you set a grace period on the window?
> >
> > Best,
> > Bruno
> >
> > On Tue, Jun 18, 2019 at 2:04 AM Parthasarathy, Mohan  
> > wrote:
> > >
> > > On further debugging, what we are seeing is that windows are expiring 
> > rather randomly as new messages are being processed. . We tested with new 
> > key for every new message. We waited for the window time before replaying 
> > new messages. Sometimes a new message would come in and create state. It 
> > takes several messages to make some of the old windows to be closed (go 
> > past suppress to the next stage). We have also seen where one of them never 
> > closed even but several other older ones expired.  Then we explicitly sent 
> > a message with the same old key and then it showed up. Also, for every new 
> > message, only one of the previous window expires even though there are 
> > several pending.
> > >
> > > If we don't use suppress, then there is never an issue. With 
> > suppress, the behavior we are seeing is weird. We are using 2.1.0 version 
> > in DSL mode. Any clues on what we could be missing ? Why isn't there an 
> > order in the way windows are closed ? As event time progresses by the new 
> > messages arriving, the older ones should expire. Is that right 
> > understanding or not ?
> > >
> > > Thanks
> > > Mohan
> > >
> > > On 6/17/19, 3:43 PM, "Parthasarathy, Mohan"  wrote:
> > >
> > > Hi,
> > >
> > > We are using suppress in the application. We see some state being 
> > created at some point in time. Now there is no new data for a day or two. 
> > We send new data but the old window of data (where we see the state being 
> > created) is not closing i.e not seeing it go through suppress and on to the 
> > next stage. It is as though the state created earlier was purged. Is this 
> > possible ?
> > >
> > > Thanks
> > > Mohan
> > >
> > >
> > >
> >
> >


Re: stream.filter() based on message header

2019-07-04 Thread Bruno Cadonna
Hi Jorg,

transform(), transformValues, and process() are not stateful if you do
not add any state store to them. You only need to leave the
variable-length argument empty.

Within those methods you can implement your desired filter operation.

Best,
Bruno

On Thu, Jul 4, 2019 at 11:51 AM Jorg Heymans  wrote:
>
> Hi,
>
> I understand that it's currently not possible to access message headers from 
> the filter() DSL operation, is there a semantically equivalent (stateless) 
> operation that can achieve the same result ? I understand that transform(), 
> transformValues() and process() could achieve the same result but they are 
> stateful.
>
> Thanks,
> Jorg
>
>


Re: Failed to rebalance

2019-07-04 Thread Bruno Cadonna
Hi Pawel,

It seems the exception comes from a producer. When a stream task tries
to resume after rebalancing, the producer of the task tries to
initialize the transactions and runs into the timeout. This could
happen if the broker is not reachable until the timeout is elapsed.
Could the big lag that you described be caused by network issues?

You can increase the timeout by increasing max.block.ms in the
producer configuration.

Best,
Bruno



On Thu, Jul 4, 2019 at 2:43 PM Paweł Gontarz  wrote:
>
> Hey all,
>
> I have seen already in archive an email concerning this, but as a solution
> it has been said to upgrade kafka version to 2.1. In my case, kafka is
> already up to date.
>
> NOTE: Issue is on since this morning.
> Specifying the problem, I'm running two kafka-streams stateful
> applications. From the very beginning of the app lifecycle, instances
> struggle to reassign correctly partitions between them which eventually
> leads them to
>
>  org.apache.kafka.streams.errors.StreamsException: stream-thread
> > [pws-budget-streams-client-mapper-StreamThread-13] Failed to rebalance.
>
>
> Due to
>
> Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired
> > while initializing transactional state in 6ms.
>
>
> In the same time I'm observing a big lag on 2 partitions of the topic which
> my streams are consuming.
> The issue had started just this morning, whereas applications are for
> already 1 month running without issues.
>
> One thing I did before it, was the reassignment of this two partitions to
> different nodes. Why? To fight over CPU consumption on one of our brokers
> (it wasn't balanced evenly).
>
> I have no clue if it has anything to do with problems on kafka-streams,
> though.
>
> Anyone encountered similar problems?
>
> Cheers,
> Paweł


Re: Streams reprocessing whole topic when deployed but not locally

2019-07-09 Thread Bruno Cadonna
Hi Alessandro,

I am not sure I understand your issue completely. If you start your
streams app in a new container without any existing local state, then
it is expected that the changelog topics are read from the beginning
to restore the local state stores. Am I misunderstanding you?

Best,
Bruno

On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
 wrote:
>
> I think I'm having again this issue, this time though it only happens on
> some state stores.
>
> Here you can find the code and the logs
> https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> We first seen that our confluent cloud bill went up 10x, then seen that our
> streams processor was restarted 12 times (kubernetes pod), checking
> confluent cloud usage it seems that the writes/reads went up from the usual
> 1-2 KB/s to 12-20 MB/s during app restarts.
>
> I've then deployed a new version on a new container (no local store/state)
> to see what happened:
>  - first, it logs everything up to line 460 of the log file in the gist
>  - at this point confluent cloud reports high read usage and the consumer
> lag starts to increase, the app is accumulating messages behind
>  - after a certain point, writes go up as well
>  - when streams app transition to RUNNING state, the final aggregation
> function resumes back to where it stopped (without reprocessing old data)
>  - consumer lag goes back to 0
>
> What makes me think it's re-reading everything are these lines:
>
> Resetting offset for partition
> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-04-changelog-2 to offset
> 20202847
> Restoring task 0_2's state store KSTREAM-AGGREGATE-STATE-STORE-04
> from beginning of the changelog
> myapp-id-KSTREAM-AGGREGATE-STATE-STORE-04-changelog-2
>
> At first I thought it's because I don't persist the aggregate store
> changelog as I do with the "LastValueStore" store which has
> "withLoggingEnabled()", but even that store has:
>
> Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
> offset 403910
> Restoring task 0_0's state store LastValueStore from beginning of the
> changelog myapp-id-LastValueStore-changelog-0
>
> Thank you everyone in advance
>
> --
> Alessandro Tagliapietra
>
> On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > I'm not sure, one thing I know for sure is that on the cloud control
> > panel, in the consumer lag page, the offset didn't reset on the input
> > topic, so it was probably something after that.
> >
> > Anyway, thanks a lot for helping, if we experience that again I'll try to
> > add more verbose logging to better understand what's going on.
> >
> > Have a great day
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang  wrote:
> >
> >> Honestly I cannot think of an issue that fixed in 2.2.1 but not in 2.2.0
> >> which could be correlated to your observations:
> >>
> >>
> >> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
> >>
> >> If you observed that on the cloud, both partitions of the source topic
> >> gets
> >> re-processed from the beginning, then it means the committed offsets were
> >> somehow lost, and hence has to start reading the source topic from
> >> scratch.
> >> If this is a re-producible issue maybe there are some lurking things in
> >> 2.2.0.
> >>
> >> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >> > Yes that's right,
> >> >
> >> > could that be the problem? Anyway, so far after upgrading to 2.2.1 from
> >> > 2.2.0 we didn't experience that problem anymore.
> >> >
> >> > Regards
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> >
> >> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang 
> >> wrote:
> >> >
> >> > > That's right, but local state is used as a "materialized view" of your
> >> > > changelog topics: if you have nothing locally, then it has to
> >> bootstrap
> >> > > from the beginning of your changelog topic.
> >> > >
> >> > > But I think your question was about the source "sensors-input" topic,
> >> not
> >> > > the changelog topic. I looked at the logs from two runs, and it seems
> >> > > locally your sensors-input has one partition, but on the cloud your
> >> > > sensors-input has two partitions. Is that right?
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > >
> >> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
> >> > > tagliapietra.alessan...@gmail.com> wrote:
> >> > >
> >> > > > Isn't the windowing state stored in the additional state store
> >> topics
> >> > > that
> >> > > > I had to additionally create?
> >> > > >
> >> > > > Like these I have here:
> >> > > >
> >> > > > sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-01-changelog
> >> > > > sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0

Re: Streams reprocessing whole topic when deployed but not locally

2019-07-10 Thread Bruno Cadonna
Hi Alessandro,

>  - how do I specify the retention period of the data? Just by setting the
> max retention time for the changelog topic?

For window and session stores, you can set retention time on a local
state store by using Materialized.withRetention(...). Consult the
javadocs for details. If the changelog topic on the broker does not
yet exist, the retention time of the changelog topic will have the
same retention time as the local store.

Additionally, you could try to set the retention time in the streams
configuration by using `StreamsConfig.TopicPrefix("retention.ms")`.

>  - wouldn't be possible, for example for my LastValueStore to compact the
> changelog and keep only the last value stored for each key? Because that's
> all I would need for my use case

That is what changelogs do. They just keep the last value written.

>  - why is it also writing that much?

Could it be that after restore of the state stores from the
changelogs, your stream app just starts working and there is a peek in
writes because the the app has a large lag initially? Maybe somebody
else has a better explanation.

Best,
Bruno

On Wed, Jul 10, 2019 at 12:39 AM Alessandro Tagliapietra
 wrote:
>
> I've just noticed that the store topic created automatically by our streams
> app have different cleanup.policy.
> I think that's the main reason I'm seeing that big read/write IO, having a
> compact policy instead of delete would make the topic much smaller.
> I'll try that to also see the impact on our storage usage.
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Jul 9, 2019 at 6:06 AM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > Oh I see, I'll try to add a persistent disk where the local stores are.
> > I've other questions then:
> >  - why is it also writing that much?
> >  - how do I specify the retention period of the data? Just by setting the
> > max retention time for the changelog topic?
> >  - wouldn't be possible, for example for my LastValueStore to compact the
> > changelog and keep only the last value stored for each key? Because that's
> > all I would need for my use case
> >
> > Thank you very much for your help
> >
> > On Tue, Jul 9, 2019, 4:00 AM Bruno Cadonna  wrote:
> >
> >> Hi Alessandro,
> >>
> >> I am not sure I understand your issue completely. If you start your
> >> streams app in a new container without any existing local state, then
> >> it is expected that the changelog topics are read from the beginning
> >> to restore the local state stores. Am I misunderstanding you?
> >>
> >> Best,
> >> Bruno
> >>
> >> On Tue, Jul 9, 2019 at 6:42 AM Alessandro Tagliapietra
> >>  wrote:
> >> >
> >> > I think I'm having again this issue, this time though it only happens on
> >> > some state stores.
> >> >
> >> > Here you can find the code and the logs
> >> > https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
> >> > We first seen that our confluent cloud bill went up 10x, then seen that
> >> our
> >> > streams processor was restarted 12 times (kubernetes pod), checking
> >> > confluent cloud usage it seems that the writes/reads went up from the
> >> usual
> >> > 1-2 KB/s to 12-20 MB/s during app restarts.
> >> >
> >> > I've then deployed a new version on a new container (no local
> >> store/state)
> >> > to see what happened:
> >> >  - first, it logs everything up to line 460 of the log file in the gist
> >> >  - at this point confluent cloud reports high read usage and the
> >> consumer
> >> > lag starts to increase, the app is accumulating messages behind
> >> >  - after a certain point, writes go up as well
> >> >  - when streams app transition to RUNNING state, the final aggregation
> >> > function resumes back to where it stopped (without reprocessing old
> >> data)
> >> >  - consumer lag goes back to 0
> >> >
> >> > What makes me think it's re-reading everything are these lines:
> >> >
> >> > Resetting offset for partition
> >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-04-changelog-2 to offset
> >> > 20202847
> >> > Restoring task 0_2's state store
> >> KSTREAM-AGGREGATE-STATE-STORE-04
> >> > from beginning of the changelog
> >> > myapp-id-KSTREAM-AGGREGATE-STATE-STORE-04-changelog-2
> >&

Re: Print RocksDb Stats

2019-07-29 Thread Bruno Cadonna
Hi Muhammed,

RocksDB is not an in-memory store. If you use only
InMemoryKeyValueStore, you are not using any RocksDB.

Best,
Bruno

On Wed, Jul 17, 2019 at 3:26 PM Muhammed Ashik  wrote:
>
> Hi I'm trying to log the rocksdb stats with the below code, but not
> observing any logs..
> I'm enabling this as the off-heap memory grows indefinitely over a
> period of time.
> We were using inMemoryKeyValueStore only, I was not sure kafka-streams uses
> rockdb as default in memory store.
>
> Kafka Streams version - 2.0.0
>
> class CustomRocksDBConfig extends RocksDBConfigSetter {
>   override def setConfig(storeName: String, options: Options, configs:
> util.Map[String, AnyRef]): Unit = {
>
> val stats = new Statistics
> stats.setStatsLevel(StatsLevel.ALL)
> options.setStatistics(stats)
>   .setStatsDumpPeriodSec(600)
> options
>   .setInfoLogLevel(InfoLogLevel.INFO_LEVEL)
> options.setDbLogDir("/tmp/dump")
>
>   }
> }


Re: KSTREAM-AGGREGATE-STATE-STORE persistence?

2019-08-12 Thread Bruno Cadonna
Hi Tim,

Kafka Streams guarantees at-least-once processing semantics by
default. That means, a record is processed (e.g. added to an
aggregate) at least once but might be processed multiple times. The
cause for processing the same record multiple time are crashes as you
described. Exactly-once processing guarantees need to be explicitly
turned on in Kafka Streams.

See the following links for more information:
https://kafka.apache.org/23/documentation/streams/core-concepts#streams_processing_guarantee
https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/

Best,
Bruno

On Mon, Aug 12, 2019 at 2:38 PM Tim Ward
 wrote:
>
> I believe I have witnessed - at least twice - something like the following 
> happening, in a Kafka Streams application where I have a 
> .groupByKey().windowedBy().aggregate() sequence.
>
>
>   *   Application runs for a while
>   *   Application crashes
>   *   Application restarts
>   *   Aggregator.apply() is called to aggregate an input message *that has 
> already been included in the aggregate*
>
> It looks like when the application crashes, the KSTREAM-AGGREGATE-STATE-STORE 
> has been persisted after message X has been aggregated but message X has not 
> been committed back to the original source topic.
>
> So on restart message X gets read and processed again, and gets aggregated a 
> second time into the same aggregate.
>
> Now that I know this is happening (it was spotted by what I thought was some 
> over-the-top paranoid validation code) I can cope with it, and it is possible 
> to make the aggregation operation idempotent, because of the structure of the 
> particular operation I'm doing ... but what if the aggregation had been 
> something like a simple counting or totalling operation? How would anyone 
> know the original input message(s) had been aggregated more than once?
>
> So, my question:
>
> Am I correct in diagnosing that persisting the state store and committing the 
> original source message are not carried out atomically, and one has to expect 
> the same message can be applied to the same aggregate multiple times, and if 
> one cares about this one has to detect it happening and make the aggregation 
> process idempotent? I don't see this explained in the JavaDoc for either 
> Aggregator (or Reducer, where presumably it also applies).
>
> Tim Ward
>
> This email is from Origami Energy Limited. The contents of this email and any 
> attachment are confidential to the intended recipient(s). If you are not an 
> intended recipient: (i) do not use, disclose, distribute, copy or publish 
> this email or its contents; (ii) please contact Origami Energy Limited 
> immediately; and then (iii) delete this email. For more information, our 
> privacy policy is available here: https://origamienergy.com/privacy-policy/. 
> Origami Energy Limited (company number 8619644) is a company registered in 
> England with its registered office at Ashcombe Court, Woolsack Way, 
> Godalming, GU7 1LQ.


Re: Kafka Streams incorrect aggregation results when re-balancing occurs

2019-08-20 Thread Bruno Cadonna
Hi Alex,

what you describe about failing before offsets are committed is one
reason why records are processed multiple times under the
at-least-once processing guarantee. That is reality of life as you
stated. Kafka Streams in exactly-once mode guarantees that this
duplicate state updates do not happen.

The exactly-once processing guarantee was implemented in Kafka Streams
for use cases where correctness is of highest importance.

Best,
Bruno



On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken  wrote:
>
> Hi all, I have a (relatively) simple streams topology that is producing
> some counts, and while testing this code I'm seeing some occasional
> incorrect aggregation results.  This seems to happen when a re-balance
> occurs - typically due to a timeout or communication hiccup with the Kafka
> broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> first is really just a de-dup table and the second is the result of the
> aggregation.  So at a high level the topology consumes from a source topic,
>   groupsByKey() and then does a reduce() where we always return the
> newValue.  Then it does a groupBy() on a new key, and finally an
> aggregate() call with an adder and subtractor.  Because our source topic
> frequently contains duplicate messages, this seemed like a good way to
> handle the dupes: the subtractor gets invoked anytime we replace a value in
> the "upstream" KTable and removes it from the count, then adds it back
> again in the adder.
>
> In the happy-path scenario where we never see any exceptions or rebalances,
> this whole thing works great - the counts at the end are 100% correct.  But
> when rebalancing is triggered we sometimes get bad counts. My theory is
> that when a timeout or connectivity problem happens during that second
> aggregation, the data ends up getting saved to the state store but the
> offsets don't get committed and the message(s) in the repartition topic
> that feed the aggregation get replayed after the stream task gets
> rebalanced, causing the counts to get incorrectly incremented or
> decremented.  (depending on whether the message was triggering the adder or
> the subtractor)  I can simulate this problem (or something similar to this
> problem) when debugging the application in my IDE just by pausing execution
> on a breakpoint inside the aggregation's adder or subtractor method for a
> few seconds.  The result of the adder or subtractor gets saved to the state
> store which means that when the messages in the repartition topic get
> re-processed, the counts get doubled.  If I enable "exactly_once"
> processing, I'm unable to recreate the problem and the counts are always
> accurate.
>
> My questions are:
>
> 1.  Is this expected behavior? In a hostile application environment where
> connectivity problems and rebalances happen frequently, is some degree of
> incorrectly aggregated data just a reality of life?
>
> 2.  Is exactly_once processing the right solution if correctness is of
> highest importance?  Or should I be looking at different ways of writing
> the topology?
>
> Thanks for any advice!
>
> Alex


Re: Kafka Streams incorrect aggregation results when re-balancing occurs

2019-08-21 Thread Bruno Cadonna
Hi Alex,

if you are interested in understanding exactly-once a bit more in
detail, I recommend you to watch the following Kafka Summit talk by
Matthias

https://www.confluent.io/kafka-summit-london18/dont-repeat-yourself-introducing-exactly-once-semantics-in-apache-kafka

Best,
Bruno

On Wed, Aug 21, 2019 at 5:07 AM Alex Brekken  wrote:
>
> Thanks guys.  I knew that re-processing messages was a possibility with
> at_least_once processing, but I guess I hadn't considered the potential
> impact on the state stores as far as aggregations are concerned.  So with
> exactly_once, it must roll-back commit(s) to the state store in a failure
> scenario?  I haven't dug into the code to see how it works, but given the
> behavior I'm seeing it must..
>
> Tim - I actually saw your related question from last week right after I
> sent mine.  :)
>
> Alex
>
> On Tue, Aug 20, 2019 at 2:28 PM Bruno Cadonna  wrote:
>
> > Hi Alex,
> >
> > what you describe about failing before offsets are committed is one
> > reason why records are processed multiple times under the
> > at-least-once processing guarantee. That is reality of life as you
> > stated. Kafka Streams in exactly-once mode guarantees that this
> > duplicate state updates do not happen.
> >
> > The exactly-once processing guarantee was implemented in Kafka Streams
> > for use cases where correctness is of highest importance.
> >
> > Best,
> > Bruno
> >
> >
> >
> > On Mon, Aug 19, 2019 at 9:24 PM Alex Brekken  wrote:
> > >
> > > Hi all, I have a (relatively) simple streams topology that is producing
> > > some counts, and while testing this code I'm seeing some occasional
> > > incorrect aggregation results.  This seems to happen when a re-balance
> > > occurs - typically due to a timeout or communication hiccup with the
> > Kafka
> > > broker.  The topology is built with the DSL, and utilizes 2 KTables: the
> > > first is really just a de-dup table and the second is the result of the
> > > aggregation.  So at a high level the topology consumes from a source
> > topic,
> > >   groupsByKey() and then does a reduce() where we always return the
> > > newValue.  Then it does a groupBy() on a new key, and finally an
> > > aggregate() call with an adder and subtractor.  Because our source topic
> > > frequently contains duplicate messages, this seemed like a good way to
> > > handle the dupes: the subtractor gets invoked anytime we replace a value
> > in
> > > the "upstream" KTable and removes it from the count, then adds it back
> > > again in the adder.
> > >
> > > In the happy-path scenario where we never see any exceptions or
> > rebalances,
> > > this whole thing works great - the counts at the end are 100% correct.
> > But
> > > when rebalancing is triggered we sometimes get bad counts. My theory is
> > > that when a timeout or connectivity problem happens during that second
> > > aggregation, the data ends up getting saved to the state store but the
> > > offsets don't get committed and the message(s) in the repartition topic
> > > that feed the aggregation get replayed after the stream task gets
> > > rebalanced, causing the counts to get incorrectly incremented or
> > > decremented.  (depending on whether the message was triggering the adder
> > or
> > > the subtractor)  I can simulate this problem (or something similar to
> > this
> > > problem) when debugging the application in my IDE just by pausing
> > execution
> > > on a breakpoint inside the aggregation's adder or subtractor method for a
> > > few seconds.  The result of the adder or subtractor gets saved to the
> > state
> > > store which means that when the messages in the repartition topic get
> > > re-processed, the counts get doubled.  If I enable "exactly_once"
> > > processing, I'm unable to recreate the problem and the counts are always
> > > accurate.
> > >
> > > My questions are:
> > >
> > > 1.  Is this expected behavior? In a hostile application environment where
> > > connectivity problems and rebalances happen frequently, is some degree of
> > > incorrectly aggregated data just a reality of life?
> > >
> > > 2.  Is exactly_once processing the right solution if correctness is of
> > > highest importance?  Or should I be looking at different ways of writing
> > > the topology?
> > >
> > > Thanks for any advice!
> > >
> > > Alex
> >


Re: Kafka

2019-09-05 Thread Bruno Cadonna
Hi Ghullam,

Apache Kafka is open source. See license under
https://github.com/apache/kafka/blob/trunk/LICENSE

Best,
Bruno

On Thu, Sep 5, 2019 at 10:19 PM Ghullam Mohiyudin
 wrote:
>
> Hi ,
> I read the information about kafka. Now i want to create a degree final
> project using kafka. Can you please tell me that kafka have free edition ?
> Thanking You..!!
>
> --
>
> *Ghullam Mohiyuddin Chishtti *


Re: Values not being aggregated

2019-09-18 Thread Bruno Cadonna
Hi Alessandro,

If you want to get each update to an aggregate, you need to disable
the cache. Otherwise, an update will only be emitted when the
aggregate is evicted or flushed from the cache.

To disable the cache, you can:
- disable it with the `Materialized` object
- set cache.max.bytes.buffering to zero which disables all caches in
the topology

Best,
Bruno

On Mon, Sep 16, 2019 at 12:11 PM Alessandro Tagliapietra
 wrote:
>
> Hello everyone,
>
> I've this code
> https://gist.github.com/alex88/719383f38541c5324caf8f47b7239e15 (I've
> omitted the store setup part) and i've this problem for a specific key.
> Basically I can see the logs up until the "Pair Stream data". "Aggregate
> stream data" line is never logger.
>
> Since between these lines there's just the groupByKey and WindowedBy, are
> there any logics in these two that could stop the flow of data? Since I
> don't have any window closing mechanism or suppression shouldn't it just go
> through?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra


Re: Needless group coordination overhead for GlobalKTables

2019-10-29 Thread Bruno Cadonna
Hi Chris,

What version of Streams are you referring to?

On the current trunk the group.id property is removed from the config
for the global consumer that populates the GlobalKTable.

See the following code line
https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1149

Best,
Bruno

On Tue, Oct 29, 2019 at 8:12 PM Chris Toomey  wrote:
>
> We have some simple Kafka streams apps that populate GlobalKTables to use
> as caches for topic contents. When running them with info-level logging
> enabled, I noticed unexpected activity around group coordination (joining,
> rebalancing, leaving, rejoining) that I didn't expect given that they need
> to consume from all topic partitions vs. use the group load balancing
> feature.
>
> I tracked this down to the way the consumer config. is generated for
> a GlobalKTable consumer -- the groupId is set to the Kafka streams
> application id instead of to null -- the consumer needlessly creates a
> ConsumerCoordinator and thus intiiates all the needless associated
> messaging and overhead.
>
> I was going to file a bug for this but per the contributing page am
> bringing this up here first. Is there a reason why GlobalKTable consumers
> should bear this group coordination overhead or should I go ahead and file
> a ticket to remove it?
>
> thanks,
> Chris


Re: Needless group coordination overhead for GlobalKTables

2019-10-30 Thread Bruno Cadonna
Hi Chris,

Thank you for the clarification. Now I see what you mean. If your
topology works correctly, I would not file it as a bug but as a
possible improvement.

Best,
Bruno

On Wed, Oct 30, 2019 at 1:20 AM Chris Toomey  wrote:
>
> Bruno,
>
> I'm using a fork based off the 2.4 branch .It's not the global consumer but
> the stream thread consumer that has the group id since it's built with the
> main consumer config:
> https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1051
>  .
>
> It shouldn't be creating a regular consumer for the topic since my topology
> only has a single element, the GlobalKTable, which is populated by the
> global consumer. My scala code:
>
> val builder: StreamsBuilder = new StreamsBuilder()
> val gTable = builder.globalTable[K, V](...)
> val stream = new KafkaStreams(builder.build(), props)
> stream.start()
>
>
> I can disable the stream thread consumer by configuring num.stream.threads
> = 0, but why does it create this stream thread consumer in the first place
> if it's not been requested in the topology?
>
> thx,
> Chris
>
> On Tue, Oct 29, 2019 at 2:08 PM Bruno Cadonna  wrote:
>
> > Hi Chris,
> >
> > What version of Streams are you referring to?
> >
> > On the current trunk the group.id property is removed from the config
> > for the global consumer that populates the GlobalKTable.
> >
> > See the following code line
> >
> > https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1149
> >
> > Best,
> > Bruno
> >
> > On Tue, Oct 29, 2019 at 8:12 PM Chris Toomey  wrote:
> > >
> > > We have some simple Kafka streams apps that populate GlobalKTables to use
> > > as caches for topic contents. When running them with info-level logging
> > > enabled, I noticed unexpected activity around group coordination
> > (joining,
> > > rebalancing, leaving, rejoining) that I didn't expect given that they
> > need
> > > to consume from all topic partitions vs. use the group load balancing
> > > feature.
> > >
> > > I tracked this down to the way the consumer config. is generated for
> > > a GlobalKTable consumer -- the groupId is set to the Kafka streams
> > > application id instead of to null -- the consumer needlessly creates a
> > > ConsumerCoordinator and thus intiiates all the needless associated
> > > messaging and overhead.
> > >
> > > I was going to file a bug for this but per the contributing page am
> > > bringing this up here first. Is there a reason why GlobalKTable consumers
> > > should bear this group coordination overhead or should I go ahead and
> > file
> > > a ticket to remove it?
> > >
> > > thanks,
> > > Chris
> >


Re: Needless group coordination overhead for GlobalKTables

2019-11-01 Thread Bruno Cadonna
Thank you for reaching out and filing the ticket.

Best,
Bruno

On Fri, Nov 1, 2019 at 3:19 AM Chris Toomey  wrote:
>
> Thanks Bruno, filed https://issues.apache.org/jira/browse/KAFKA-9127 .
>
> On Wed, Oct 30, 2019 at 2:06 AM Bruno Cadonna  wrote:
>
> > Hi Chris,
> >
> > Thank you for the clarification. Now I see what you mean. If your
> > topology works correctly, I would not file it as a bug but as a
> > possible improvement.
> >
> > Best,
> > Bruno
> >
> > On Wed, Oct 30, 2019 at 1:20 AM Chris Toomey  wrote:
> > >
> > > Bruno,
> > >
> > > I'm using a fork based off the 2.4 branch .It's not the global consumer
> > but
> > > the stream thread consumer that has the group id since it's built with
> > the
> > > main consumer config:
> > >
> > https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1051
> > >  .
> > >
> > > It shouldn't be creating a regular consumer for the topic since my
> > topology
> > > only has a single element, the GlobalKTable, which is populated by the
> > > global consumer. My scala code:
> > >
> > > val builder: StreamsBuilder = new StreamsBuilder()
> > > val gTable = builder.globalTable[K, V](...)
> > > val stream = new KafkaStreams(builder.build(), props)
> > > stream.start()
> > >
> > >
> > > I can disable the stream thread consumer by configuring
> > num.stream.threads
> > > = 0, but why does it create this stream thread consumer in the first
> > place
> > > if it's not been requested in the topology?
> > >
> > > thx,
> > > Chris
> > >
> > > On Tue, Oct 29, 2019 at 2:08 PM Bruno Cadonna 
> > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > What version of Streams are you referring to?
> > > >
> > > > On the current trunk the group.id property is removed from the config
> > > > for the global consumer that populates the GlobalKTable.
> > > >
> > > > See the following code line
> > > >
> > > >
> > https://github.com/apache/kafka/blob/065411aa2273fd393e02f0af46f015edfc9f9b55/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java#L1149
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Oct 29, 2019 at 8:12 PM Chris Toomey 
> > wrote:
> > > > >
> > > > > We have some simple Kafka streams apps that populate GlobalKTables
> > to use
> > > > > as caches for topic contents. When running them with info-level
> > logging
> > > > > enabled, I noticed unexpected activity around group coordination
> > > > (joining,
> > > > > rebalancing, leaving, rejoining) that I didn't expect given that they
> > > > need
> > > > > to consume from all topic partitions vs. use the group load balancing
> > > > > feature.
> > > > >
> > > > > I tracked this down to the way the consumer config. is generated for
> > > > > a GlobalKTable consumer -- the groupId is set to the Kafka streams
> > > > > application id instead of to null -- the consumer needlessly creates
> > a
> > > > > ConsumerCoordinator and thus intiiates all the needless associated
> > > > > messaging and overhead.
> > > > >
> > > > > I was going to file a bug for this but per the contributing page am
> > > > > bringing this up here first. Is there a reason why GlobalKTable
> > consumers
> > > > > should bear this group coordination overhead or should I go ahead and
> > > > file
> > > > > a ticket to remove it?
> > > > >
> > > > > thanks,
> > > > > Chris
> > > >
> >


Re: Can not compile kafka

2019-11-14 Thread Bruno Cadonna
Hi Miguel,

I build Kafka with Gradle 5.2.1 and at the end of the build I get the
following message:

"Deprecated Gradle features were used in this build, making it
incompatible with Gradle 6.0."

So, maybe you ran in one of those incompatibilities.

Try to compile with a 5.x version of Gradle.

Best,
Bruno

On Thu, Nov 14, 2019 at 10:59 AM Miguel Silvestre  wrote:
>
> Hi,
>
> I'm on macOS Mojave 10.14.6 but when I run gradle (I'm using version 6.0) I
> get the following error:
>
> What can I do?
>
> FAILURE: Build failed with an exception.
>
> * Where:
> Build file '/Users/miguel.silvestre/Projects/others/kafka/build.gradle'
> line: 480
>
> * What went wrong:
> A problem occurred evaluating root project 'kafka'.
> > Could not create task ':clients:spotbugsMain'.
>> Could not create task of type 'SpotBugsTask'.
>   > Could not create an instance of type
> com.github.spotbugs.internal.SpotBugsReportsImpl.
>  >
> org.gradle.api.reporting.internal.TaskReportContainer.(Ljava/lang/Class;Lorg/gradle/api/Task;)V
>
> * Try:
> Run with --stacktrace option to get the stack trace. Run with --info or
> --debug option to get more log output. Run with --scan to get full insights.
>
> * Get more help at https://help.gradle.org
>
> Deprecated Gradle features were used in this build, making it incompatible
> with Gradle 7.0.
> Use '--warning-mode all' to show the individual deprecation warnings.
> See
> https://docs.gradle.org/6.0/userguide/command_line_interface.html#sec:command_line_warnings
>
> BUILD FAILED in 2s
> --
> Miguel Silvestre


Re: Merging create/delete updates for Kafka Streams aggregations

2019-11-15 Thread Bruno Cadonna
Hi Thilo,

You can influence the rate of updates of aggregations by configuring
the size of the record caches with `cache.max.bytes.buffering`.

Details can be found here:
https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#aggregating
https://kafka.apache.org/22/documentation/streams/developer-guide/memory-mgmt.html#streams-developer-guide-memory-management-record-cache

If you aggregate over windows, you should have a look at `suppress()`.

Details can be found here:
 
https://kafka.apache.org/22/documentation/streams/developer-guide/dsl-api.html#window-final-results

Best,
Bruno

On Fri, Nov 15, 2019 at 12:02 PM Thilo-Alexander Ginkel
 wrote:
>
> Hello everyone,
>
> we are using the Kafka Streams DSL (v2.2.1) to perform aggregations on a
> topic using org.apache.kafka.streams.kstream.KGroupedTable#aggregate. ATM
> we are seeing one update being published when the subtractor is being
> called and another one when the adder is called.
>
> I was under the impression that there is some setting that allows
> folding/merging these updates for most cases.
>
> As I can't seem to find this in the docs, can someone please help me out
> with a pointer?
>
> Thanks & kind regards,
> Thilo


Re: [kafka-clients] [VOTE] 2.4.0 RC0

2019-11-18 Thread Bruno Cadonna
Hi,

ClientMetricsTest.shouldAddCommitIdMetric should only fail if executed
from an IDE. The test fails because the test expects a file on the
class path which is not there when the test is executed from the IDE,
but is there when the test is executed from gradle. I will try to fix
the test so that it can also be executed from the IDE.

Best,
Bruno

On Mon, Nov 18, 2019 at 6:51 AM Vahid Hashemian
 wrote:
>
> Thanks Manikumar for managing this release. Looking forward to it.
>
> I built binary from the source and was able to successfully run the 
> quickstarts.
>
> However, this streams unit test also fails for me constantly:
>
> ClientMetricsTest. shouldAddCommitIdMetric
>
> java.lang.AssertionError:
>   Unexpected method call 
> StreamsMetricsImpl.addClientLevelImmutableMetric("commit-id", "The version 
> control commit ID of the Kafka Streams client", INFO, "unknown"):
> StreamsMetricsImpl.addClientLevelImmutableMetric("commit-id", "The 
> version control commit ID of the Kafka Streams client", INFO, 
> and(not("unknown"), notNull())): expected: 1, actual: 0
> at 
> org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:44)
> at 
> org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
> at 
> org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:97)
> ...
>
> Thanks,
> --Vahid
>
> On Thu, Nov 14, 2019 at 10:21 AM Manikumar  wrote:
>>
>> Hello Kafka users, developers and client-developers,
>>
>> This is the first candidate for release of Apache Kafka 2.4.0.
>> There is work in progress for couple blockers PRs. I am publishing RC0 to 
>> avoid further delays in testing the release.
>>
>> This release includes many new features, including:
>> - Allow consumers to fetch from closest replica
>> - Support for incremental cooperative rebalancing to the consumer rebalance 
>> protocol
>> - MirrorMaker 2.0 (MM2), a new multi-cluster, cross-datacenter replication 
>> engine
>> - New Java authorizer Interface
>> - Support for  non-key joining in KTable
>> - Administrative API for replica reassignment
>> - Sticky partitioner
>> - Return topic metadata and configs in CreateTopics response
>> - Securing Internal connect REST endpoints
>> - API to delete consumer offsets and expose it via the AdminClient.
>>
>> Release notes for the 2.4.0 release:
>> https://home.apache.org/~manikumar/kafka-2.4.0-rc0/RELEASE_NOTES.html
>>
>> *** Please download, test  by  Thursday, November 20, 9am PT
>>
>> Kafka's KEYS file containing PGP keys we use to sign the release:
>> https://kafka.apache.org/KEYS
>>
>> * Release artifacts to be voted upon (source and binary):
>> https://home.apache.org/~manikumar/kafka-2.4.0-rc0/
>>
>> * Maven artifacts to be voted upon:
>> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>>
>> * Javadoc:
>> https://home.apache.org/~manikumar/kafka-2.4.0-rc0/javadoc/
>>
>> * Tag to be voted upon (off 2.4 branch) is the 2.4.0 tag:
>> https://github.com/apache/kafka/releases/tag/2.4.0-rc0
>>
>> * Documentation:
>> https://kafka.apache.org/24/documentation.html
>>
>> * Protocol:
>> https://kafka.apache.org/24/protocol.html
>>
>> Thanks,
>> Manikumar
>>
>> --
>> You received this message because you are subscribed to the Google Groups 
>> "kafka-clients" group.
>> To unsubscribe from this group and stop receiving emails from it, send an 
>> email to kafka-clients+unsubscr...@googlegroups.com.
>> To view this discussion on the web visit 
>> https://groups.google.com/d/msgid/kafka-clients/CAMVt_Aw945uqcpisFjZHAR5m8Sidw6hW4ia%2B7%3DjxEfadmBPzcw%40mail.gmail.com.
>
>
>
> --
>
> Thanks!
> --Vahid
>
> --
> You received this message because you are subscribed to the Google Groups 
> "kafka-clients" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to kafka-clients+unsubscr...@googlegroups.com.
> To view this discussion on the web visit 
> https://groups.google.com/d/msgid/kafka-clients/CAHR2v2mKJtHG6S9P%3Dmw08SxbWjQCowp8cpZNpzr9acW1EcdegQ%40mail.gmail.com.


Re: Unique users per calendar month using kafka streams

2019-11-21 Thread Bruno Cadonna
Hi Chintan,

You cannot specify time windows based on a calendar object like months.

In the following, I suppose the keys of your records are user IDs. You
could extract the months from the timestamps of the events and add
them to the key of your records. Then you can group the records by key
and count them. Be aware that your state that stores the counts will
grow indefinitely and therefore you need to take care how to remove
counts you do not need anymore from your local state.

Take a look at the following example of how to deduplicate records

https://github.com/confluentinc/kafka-streams-examples/blob/5.3.1-post/src/test/java/io/confluent/examples/streams/EventDeduplicationLambdaIntegrationTest.java

It shows how to avoid indefinite growing of local store in such cases.
Try to adapt it to solve your problem by extending the key with the
month and computing the count instead of looking for duplicates.

Best,
Bruno

On Thu, Nov 21, 2019 at 10:28 AM chintan mavawala
 wrote:
>
> Hi,
>
> We have a use case to capture number of unique users per month. We planned
> to use windowing concept for this.
>
> For example, group events from input topic by user name and later sub group
> them based on time window. However i don't see how i can sub group the
> results based on particular month, say January. The only way is sub group
> based on time.
>
> Any pointers would be appreciated.
>
> Regards,
> Chintan


Re: Case of joining multiple streams/tables

2019-12-05 Thread Bruno Cadonna
Hi Sachin,

I do not completely understand what you mean with one single
operation. Do you mean one call of a method in the DSL or the join is
processed on one processor node?

If you mean the latter, the joins in the DSL are also not processed on
one single processor node.

If you mean the former, the DSL does not have a single method call to
join multiple streams and it does not necessarily need it to process
an n-way join more efficiently, because the DSL is just the way you
declare the join. How the join is processed depends on how the
topology is build from the DSL code. Having a DSL call specific for a
n-way join would merely result in syntactic sugar (which can also make
sense).

If you have specific requirements that are not fulfilled by the DSL
you can use the Processor API to implement your own join.

See the following StackOverflow question for more details on joins.
https://stackoverflow.com/questions/53485632/kafka-streams-implementing-joining-using-the-processor-api

Best,
Bruno

On Thu, Dec 5, 2019 at 7:08 AM Sachin Mittal  wrote:
>
> Hi,
> I have checked the documentation and what I see that we can join two
> streams or tables at a given time.
>
> I have a case where I have multiple streams which I need to join based on
> common key.
>
> As of now I am first joining two and the result of that with next and so on.
>
> Is there a way or any case implemented anywhere that joins multiple
> streams/tables in a single operation.
>
> If not then is this something that is pipelined for future releases?
> Or does something like this make sense to be part of streams functionality?
>
> Thanks
> Sachin


Re: StateStore extends UnicastRemoteObject

2020-01-06 Thread Bruno Cadonna
Hi Michelle,

Are you sure you do not pass a null instead of your custom store to
your topology by mistake?
How does the implementation of the `build()` method of your
`MyCustomStoreBuilder` look like?

Best,
Bruno

On Mon, Dec 30, 2019 at 12:06 AM Michelle Francois  wrote:
>
> Hello,
> I want to have two way communication in Apache Kafka and since the Apache
> Kafka Topology permits no cyclic topology I was suggested by my supervisor
> to use State Stores as remote objects.
>
> I created custom State Stores as described here:
> https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#querying-local-custom-state-stores
>
> (I did not use interactive queries I just created some custom states stores
> for my Kafka Processors)
>
> If I declare the state store as:
>
> * public class MyCustomStore implements StateStore,
> MyWriteableCustomStore, Serializable*
> and I pass my Custom State Stores to some remote method of another remote
> object and since it implements Serializable it passes by value and I want
> it to be passed by reference.
>
> I think my Custom State Store in order to be passed by reference should now
> extend  UnicastRemoteObject like that:
>
> *public class MyCustomStore extends UnicastRemoteObject implements
> StateStore, MyWriteableCustomStore*
> *...*
> *public MyCustomStore(...) throws RemoteException{*
> *...*
> *}*
> but now I get this Exception at initialization
>
> ERROR stream-thread [site-client-StreamThread-1] Encountered the following
> error during processing:
> (org.apache.kafka.streams.processor.internals.StreamThread:744)
> java.lang.NullPointerException at
> org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:226)
>  at
> org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:225)
>  at
> org.apache.kafka.streams.processor.internals.AssignedTasks.initializeNewTasks(AssignedTasks.java:88)
>  at
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:314)
>  at
> org.apache.kafka.streams.processor.StreamThread.runOnce(StreamThread.java:824)
>  at
> org.apache.kafka.streams.processor.StreamThread.runLoop(StreamThread.java:767)
>  at
> org.apache.kafka.streams.processor.StreamThread.run(StreamThread.java:736)
>
>
> I don't list my code of State Stores since it is consisted of hundreds of
> lines.If I have to I will enlist.
>
> My version of Apache Kafka Streams in the cluster I use is :
> kafka-streams-2.0.0.3.1.0.0-78.jar (in kafka-broker/libs folder)
>
> Whenever an instance of my Custom State Store is created  I use try- catch
> clause since the constructor throws RemoteException
>
> Thanks in advance


Re: unpacking a column from a JSON structured msg, using stream processing

2020-01-17 Thread Bruno Cadonna
Hi George,

Could the following tutorial help you?
https://kafka-tutorials.confluent.io/transform-a-stream-of-events/kstreams.html

Best,
Bruno

On Fri, Jan 17, 2020 at 7:48 AM George  wrote:
>
> I have a topic with message as per:
>
> {'file_name'   : filename,
> 'line_number'  : src_line_number,
> 'section'  : vTag,
> 'line_data': line_data
> }
>
>
> I want to unpack line_data into multiple columns based on position.
>
> Can I do this via stream processing. the output will go onto a new
> topic/stream.
>
> G
>
> --
> You have the obligation to inform one honestly of the risk, and as a person
> you are committed to educate yourself to the total risk in any activity!
>
> Once informed & totally aware of the risk,
> every fool has the right to kill or injure themselves as they see fit!


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-10 Thread Bruno Cadonna
Hi,

I am pretty sure this was intentional. All skipped records log
messages are on WARN level.

If a lot of your records are skipped on app restart with this log
message on WARN-level, they were also skipped with the log message on
DEBUG-level. You simply did not know about it before. With an
in-memory window store, this message is logged when a window with a
start time older than the current stream time minus the retention
period is put into the window store, i.e., the window is NOT inserted
into the window stroe. If you get a lot of them on app restart, you
should have a look at the timestamps of your records and the retention
of your window store. If those values do not explain the behavior,
please try to find a minimal example that shows the issue and post it
here on the mailing list.

On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří  wrote:
>
> Hi,
>
> in
> https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
>
> log level of "Skipping record for expired segment" was changed from debug
> to warn. Was it intentional change? Should it be somehow handled by user?
> How can user handle it? I am getting a lot of these on app restart.


Re: [HELP]

2020-02-11 Thread Bruno Cadonna
Hi Francis,

You need to sign-up to the Apache wiki at
https://cwiki.apache.org/confluence/signup.action

Best,
Bruno

On Tue, Feb 11, 2020 at 1:05 PM 萨尔卡 <1026203...@qq.com> wrote:
>
> i don't have a apache id. how can i apply one for create KIP?
>
>
>
> Have a nice dayFrancis Lee
>
>
> QQ : 1026203200
>
>
>  


Re: "Skipping record for expired segment" in InMemoryWindowStore

2020-02-24 Thread Bruno Cadonna
 > > > records by itself isn't a cause for concern, since this is exactly what
> > > Streams
> > > > is designed to do in a number of situations.
> > > >
> > > > However, during the KIP-444 discussion, the decision was reversed, and
> > we
> > > > decided to just log one "roll-up" metric for all skips and increase the
> > > log
> > > > messages to warning level for debuggability. This particularly makes
> > sense
> > > > because you otherwise would have to restart the application to change
> > the
> > > > log level if you needed to figure out why the single skipped-record
> > metric
> > > > is non-zero. And then you may not even observe it again.
> > > >
> > > > I either missed the memo on that discussion, or participated in it and
> > > then
> > > > forgot it even happened. I'm not sure I want to look back at the
> > thread to
> > > > find out.
> > > >
> > > > Anyway, I've closed the PR I opened to move it back to debug. We should
> > > > still try to help figure out the root cause of this particular email
> > > thread,
> > > > though.
> > > >
> > > > Thanks,
> > > > -John
> > > >
> > > > On Mon, Feb 10, 2020, at 12:20, Sophie Blee-Goldman wrote:
> > > > > While I agree that seems like it was probably a refactoring mistake,
> > I'm
> > > > > not
> > > > > convinced it isn't the right thing to do. John, can you reiterate the
> > > > > argument
> > > > > for setting it to debug way back when?
> > > > >
> > > > > I would actually present this exact situation as an argument for
> > > keeping it
> > > > > as
> > > > > warn, since something indeed seems fishy here that was only surfaced
> > > > > through this warning. That said, maybe the metric is the more
> > > appropriate
> > > > > way to bring attention to this: not sure if it's info or debug level
> > > > > though, or
> > > > > how likely it is that anyone really pays attention to it?
> > > > >
> > > > > On Mon, Feb 10, 2020 at 9:53 AM John Roesler 
> > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I’m sorry for the trouble. It looks like it was a mistake during
> > > > > >
> > > > > > https://github.com/apache/kafka/pull/6521
> > > > > >
> > > > > > Specifically, while addressing code review comments to change a
> > bunch
> > > of
> > > > > > other logs from debugs to warnings, that one seems to have been
> > > included by
> > > > > > accident:
> > > > > >
> > >
> > https://github.com/apache/kafka/commit/ac27e8578f69d60a56ba28232d7e96c76957f66c
> > > > > >
> > > > > > I’ll see if I can fix it today.
> > > > > >
> > > > > > Regarding Bruno's thoughts, there was a pretty old decision to
> > > capture the
> > > > > > "skipped records" as a metric for visibility and log it at the
> > debug
> > > level
> > > > > > for debuggability. We decided that "warning" wasn't the right level
> > > because
> > > > > > Streams is operating completely as specified.
> > > > > >
> > > > > > However, I do agree that it doesn't seem right to see more skipped
> > > records
> > > > > > during start-up; I would expect to see exactly the same records
> > > skipped
> > > > > > during start-up as during regular processing, since the skipping
> > > logic is
> > > > > > completely deterministic and based on the sequence of timestamps
> > your
> > > > > > records have in the topic.  Maybe you just notice it more during
> > > startup?
> > > > > > I.e., if there are 1000 warning logs spread over a few months, then
> > > you
> > > > > > don't notice it, but when you see them all together at start-up,
> > it's
> > > more
> > > > > > concerning?
> > > > > >
> > > > > > Thanks,
> > > > > > -John
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2020, at 10:15, Bruno Cadonna wrote:
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am pretty sure this was intentional. All skipped records log
> > > > > > > messages are on WARN level.
> > > > > > >
> > > > > > > If a lot of your records are skipped on app restart with this log
> > > > > > > message on WARN-level, they were also skipped with the log
> > message
> > > on
> > > > > > > DEBUG-level. You simply did not know about it before. With an
> > > > > > > in-memory window store, this message is logged when a window
> > with a
> > > > > > > start time older than the current stream time minus the retention
> > > > > > > period is put into the window store, i.e., the window is NOT
> > > inserted
> > > > > > > into the window stroe. If you get a lot of them on app restart,
> > you
> > > > > > > should have a look at the timestamps of your records and the
> > > retention
> > > > > > > of your window store. If those values do not explain the
> > behavior,
> > > > > > > please try to find a minimal example that shows the issue and
> > post
> > > it
> > > > > > > here on the mailing list.
> > > > > > >
> > > > > > > On Mon, Feb 10, 2020 at 2:27 PM Samek, Jiří
> >  > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > in
> > > > > > > >
> > > > > >
> > >
> > https://github.com/apache/kafka/commit/9f5a69a4c2d6ac812ab6134e64839602a0840b87#diff-a5cfe68a5931441eff5f00261653dd10R134
> > > > > > > >
> > > > > > > > log level of "Skipping record for expired segment" was changed
> > > from
> > > > > > debug
> > > > > > > > to warn. Was it intentional change? Should it be somehow
> > handled
> > > by
> > > > > > user?
> > > > > > > > How can user handle it? I am getting a lot of these on app
> > > restart.
> > > > > > >
> > > > > >
> > > > >
> > >
> > >
> > >
> > > --
> > >
> > > Jiří Samek | Software Developer
> > >
> > > AVAST Software s.r.o. | Pikrtova 1737/1a | 140 00  Praha 4
> > >
> > > M +420 734 524 549 | E sa...@avast.com | W www.avast.com
> > >
> >
>
>
> --
>
> *Jiří Samek * | *Software Developer*
>
> *AVAST Software s.r.o.* | Pikrtova 1737/1a | 140 00  Praha 4
>
> *M* +420 734 524 549 | *E* sa...@avast.com | *W* www.avast.com


Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Bruno Cadonna
Hello Guozhang and Adam,

Regarding Guozhang's proposal please see recent discussions about
`transformValues()` and returning `null` from the transformer:
https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602.

With the current behavior, the commands should be:

`stream.transformValues(...).filter((k,v) -> return v !=
null).groupByKey().aggregate()`

Best,
Bruno

On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang  wrote:
>
> Hello Adam,
>
> It seems your intention is to not "avoid emitting if the new aggregation
> result is the same as the old aggregation" but to "avoid processing the
> aggregation at all if it state is already some certain value", right?
>
> In this case I think you can try sth. like this:
>
> *stream.transformValues().groupByKey().aggregate()*
>
> where transformValues is just used as a slight complicated "filter"
> operation, in which you can access the state store that "aggregate" is
> connected to, and read / check if the corresponding entry is already
> `success`, if yes let `transformValue` to return `null` which means forward
> nothing to the downstream.
>
> The reason to use transformValues instead of transform is to make sure you
> do not introduce unnecessary repartitioning here.
>
> Guozhang
>
>
>
> On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart 
> wrote:
>
> > So I am trying to process incoming events, that may or may not actually
> > update the state of my output object. Originally I was doing this with a
> > KStream/KTable join, until I saw the discussion about "KTable in Compact
> > Topic takes too long to be updated", when I switched to
> > groupByKey().aggregate().
> >
> > Some events may not result in a state change. For example, once I have an
> > incoming success event, I emit a success output and future incoming failure
> > events will be ignored.
> >
> > My intention is to only emit a record from the aggregate KTable if the
> > aggregate record actually changed. But I can't figure out how to do that
> > within the aggregator interface. I've tried returning the incoming
> > aggregate object when nothing changes, but I still get a record emitted
> > from the table.
> >
>
>
> --
> -- Guozhang


Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Bruno Cadonna
Hi Sachin,

I am afraid I cannot follow your point.

You can still use a filter if you do not want to emit records
downstream w/o triggering any repartitioning.

Best,
Bruno

On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal  wrote:
>
> Hi,
> This is really getting interesting.
> Now if we don't want a record to be emitted downstream only way we can do
> is via transform or (flatTransform).
>
> Since we are now reverting the fix for null record in transformValues and
> rather change the docs, doesn't this add bit of confusion for users.
> Confluent docs says that:
> transformValues is preferable to transform because it will not cause data
> re-partitioning.
>
> So in many cases if just the record's value structure is sufficient to
> determine whether we should emit it downstream or not, we would still be
> forced to
> use transform and unnecessarily cause data re-partitioning. Won't this be
> in-efficient.
>
> Thanks
> Sachin
>
>
>
> On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna  wrote:
>
> > Hello Guozhang and Adam,
> >
> > Regarding Guozhang's proposal please see recent discussions about
> > `transformValues()` and returning `null` from the transformer:
> >
> > https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> > .
> >
> > With the current behavior, the commands should be:
> >
> > `stream.transformValues(...).filter((k,v) -> return v !=
> > null).groupByKey().aggregate()`
> >
> > Best,
> > Bruno
> >
> > On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang  wrote:
> > >
> > > Hello Adam,
> > >
> > > It seems your intention is to not "avoid emitting if the new aggregation
> > > result is the same as the old aggregation" but to "avoid processing the
> > > aggregation at all if it state is already some certain value", right?
> > >
> > > In this case I think you can try sth. like this:
> > >
> > > *stream.transformValues().groupByKey().aggregate()*
> > >
> > > where transformValues is just used as a slight complicated "filter"
> > > operation, in which you can access the state store that "aggregate" is
> > > connected to, and read / check if the corresponding entry is already
> > > `success`, if yes let `transformValue` to return `null` which means
> > forward
> > > nothing to the downstream.
> > >
> > > The reason to use transformValues instead of transform is to make sure
> > you
> > > do not introduce unnecessary repartitioning here.
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart 
> > > wrote:
> > >
> > > > So I am trying to process incoming events, that may or may not actually
> > > > update the state of my output object. Originally I was doing this with
> > a
> > > > KStream/KTable join, until I saw the discussion about "KTable in
> > Compact
> > > > Topic takes too long to be updated", when I switched to
> > > > groupByKey().aggregate().
> > > >
> > > > Some events may not result in a state change. For example, once I have
> > an
> > > > incoming success event, I emit a success output and future incoming
> > failure
> > > > events will be ignored.
> > > >
> > > > My intention is to only emit a record from the aggregate KTable if the
> > > > aggregate record actually changed. But I can't figure out how to do
> > that
> > > > within the aggregator interface. I've tried returning the incoming
> > > > aggregate object when nothing changes, but I still get a record emitted
> > > > from the table.
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> >


Re: Error handling guarantees in Kafka Streams

2020-02-26 Thread Bruno Cadonna
Hi Magnus,

with exactly-once, the producer commits the consumer offsets. Thus, if
the producer is not able to successfully commit a transaction, no
consumer offsets will be successfully committed, too.

Best,
Bruno

On Wed, Feb 26, 2020 at 1:51 PM Reftel, Magnus
 wrote:
>
> Hi,
>
> From my understanding, it is guaranteed that when a Kafka Streams application 
> running with the exactly_once processing guarantee receives a record, it will 
> either finish processing the record (including flushing any records generated 
> as a direct result of processing the message and committing the transaction), 
> invoke either the DeserializationExceptionHandler or the 
> ProductionExceptionHandler exception handler, or retry processing the 
> message. Is that correct, or are there cases where a record can be consumed 
> (and the consumption committed) without the Kafka Streams application being 
> able to either produce any output or handle an exception?
>
> Best Regards
> Magnus Reftel
>
> 
> Denne e-posten og eventuelle vedlegg er beregnet utelukkende for den 
> institusjon eller person den er rettet til og kan vaere belagt med lovbestemt 
> taushetsplikt. Dersom e-posten er feilsendt, vennligst slett den og kontakt 
> Skatteetaten.
> The contents of this email message and any attachments are intended solely 
> for the addressee(s) and may contain confidential information and may be 
> legally protected from disclosure. If you are not the intended recipient of 
> this message, please immediately delete the message and alert the Norwegian 
> Tax Administration.


Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-27 Thread Bruno Cadonna
Hi,

If there is an operation downstream that needs key co-location (e.g.
aggregation), stream.transformValues(/*return null for values that
don't need to be
forwarded downstream*/).filter((k,v) -> return v !=null) would be more
efficient, because for the stream.transform(/*return null for records
that don't need to be forwarded
downstream*/) approach a repartition step will be inserted.

Without an operation that needs key co-location downstream, it is hard
to say. Experiments would be needed.

Best,
Bruno

On Thu, Feb 27, 2020 at 7:28 AM Sachin Mittal  wrote:
>
> Hi,
> Yes using filter with transformValues would also work.
> I have a question out of curiosity. which one would be more efficient?
> stream.transform(/*return null for records that don't need to be forwarded
> downstream*/)
> or
> stream.transformValues(/*return null for values that don't need to be
> forwarded downstream*/).filter((k,v) -> return v !=null)
>
> Thanks
> Sachin
>
>
> On Tue, Feb 25, 2020 at 11:48 PM Bruno Cadonna  wrote:
>
> > Hi Sachin,
> >
> > I am afraid I cannot follow your point.
> >
> > You can still use a filter if you do not want to emit records
> > downstream w/o triggering any repartitioning.
> >
> > Best,
> > Bruno
> >
> > On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal  wrote:
> > >
> > > Hi,
> > > This is really getting interesting.
> > > Now if we don't want a record to be emitted downstream only way we can do
> > > is via transform or (flatTransform).
> > >
> > > Since we are now reverting the fix for null record in transformValues and
> > > rather change the docs, doesn't this add bit of confusion for users.
> > > Confluent docs says that:
> > > transformValues is preferable to transform because it will not cause data
> > > re-partitioning.
> > >
> > > So in many cases if just the record's value structure is sufficient to
> > > determine whether we should emit it downstream or not, we would still be
> > > forced to
> > > use transform and unnecessarily cause data re-partitioning. Won't this be
> > > in-efficient.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > > On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna 
> > wrote:
> > >
> > > > Hello Guozhang and Adam,
> > > >
> > > > Regarding Guozhang's proposal please see recent discussions about
> > > > `transformValues()` and returning `null` from the transformer:
> > > >
> > > >
> > https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602
> > > > .
> > > >
> > > > With the current behavior, the commands should be:
> > > >
> > > > `stream.transformValues(...).filter((k,v) -> return v !=
> > > > null).groupByKey().aggregate()`
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang 
> > wrote:
> > > > >
> > > > > Hello Adam,
> > > > >
> > > > > It seems your intention is to not "avoid emitting if the new
> > aggregation
> > > > > result is the same as the old aggregation" but to "avoid processing
> > the
> > > > > aggregation at all if it state is already some certain value", right?
> > > > >
> > > > > In this case I think you can try sth. like this:
> > > > >
> > > > > *stream.transformValues().groupByKey().aggregate()*
> > > > >
> > > > > where transformValues is just used as a slight complicated "filter"
> > > > > operation, in which you can access the state store that "aggregate"
> > is
> > > > > connected to, and read / check if the corresponding entry is already
> > > > > `success`, if yes let `transformValue` to return `null` which means
> > > > forward
> > > > > nothing to the downstream.
> > > > >
> > > > > The reason to use transformValues instead of transform is to make
> > sure
> > > > you
> > > > > do not introduce unnecessary repartitioning here.
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > > On Mon, Feb 24, 2020 at 2:0

Re: Statestore restoration - Error while range compacting during restoring

2020-04-16 Thread Bruno Cadonna
Hi Nicolas,

Thank you for reporting this issue.

As far as I understand, the issue is that bulk loading as done in Kafka
Streams does work as expected if FIFO compaction is used.

I would propose that you open a bug ticket. Please make sure to include
steps to reproduce the issue in the ticket. Additionally, since you already
have some code that solves the issue, you could assign the ticket to
yourself and open a PR for it. Would you be interested in contributing?

If you cannot assign the ticket to yourself, ask to be added to the list of
contributors on the dev mailing list.

Best,
Bruno

On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot <
nicolas.car...@chronopost.fr> wrote:

> # Bump #
> So is this a bug ? Should I file a ticket ?
> Any idea ? I don't like the idea of having to patch Kafka libraries...
>
> Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot 
> a écrit :
>
>> Added some nasty code in kafka 2.4.1. Seems to work fine for now... From
>> my understanding, the compaction process when restoring a store is only
>> done to speed up things.
>> So I guess this kind of "hack" isn't such a big deal ?
>>
>> [image: image.png]
>>
>> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot 
>> a écrit :
>>
>>> Here is the full configuration of Rocks.
>>>
>>> [image: image.png]
>>>
>>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot <
>>> nicolas.car...@chronopost.fr> a écrit :
>>>
 It's not that I cannot turn on compaction.
 Compaction works fine.
 The issue is with the restoration process of the state store, which
 tries to compact the store to a single level: db.compactRange(columnFamily,
 true, 1, 0) before bulk loading the data.
 It automatically fails when I'm using FIFO compaction.


 Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot <
 nicolas.car...@chronopost.fr> a écrit :

> My current workaround is to completely delete the state store and
> rebuild it from scratch.
>
> Le mar. 31 mars 2020 à 21:39, Boyang Chen 
> a écrit :
>
>> Thanks Nicolas for the report, so are you suggesting that you
>> couldn't turn
>> on compactions for the state store? Is there a workaround?
>>
>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot <
>> nicolas.car...@chronopost.fr>
>> wrote:
>>
>> > After some more testing and debugging, it seems that it is caused
>> by the
>> > compaction option I've configured for RocksDB. When removed
>> everything is
>> > fine...
>> > The option is as follow:
>> >
>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
>> > fifoOptions.setMaxTableFilesSize(maxSize);
>> > fifoOptions.setAllowCompaction(true);
>> > options.setCompactionOptionsFIFO(fifoOptions);
>> > options.setCompactionStyle(CompactionStyle.FIFO);
>> >
>> > Le mar. 31 mars 2020 à 16:27, Nicolas Carlot <
>> nicolas.car...@chronopost.fr
>> > >
>> > a écrit :
>> >
>> > > Hello everyone,
>> > >
>> > > I'm currently facing an issue with RocksDb internal compaction
>> process,
>> > > which occurs when the local state store of several of my
>> KafkaStream
>> > > applications are being restored. This is sadly a huge concern as
>> it
>> > > completely discard resiliency over node failure as those often
>> lead to a
>> > > state store restoration. The only workaround I currently have is
>> to
>> > delete
>> > > the local store to restore it from scratch. I'm using version
>> 2.4.1 of
>> > the
>> > > Java libraries.
>> > >
>> > > The exception thrown by the KStream process is:
>> > > org.apache.kafka.streams.errors.ProcessorStateException: Error
>> while
>> > range
>> > > compacting during restoring  store merge_store
>> > > at
>> > >
>> >
>> org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.toggleDbForBulkLoading(RocksDBStore.java:615)
>> > > ~[kafka-stream-router.jar:?]
>> > > at
>> > >
>> >
>> org.apache.kafka.streams.state.internals.RocksDBStore.toggleDbForBulkLoading(RocksDBStore.java:398)
>> > > ~[kafka-stream-router.jar:?]
>> > > at
>> > >
>> >
>> org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.onRestoreStart(RocksDBStore.java:644)
>> > > ~[kafka-stream-router.jar:?]
>> > > at
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.onRestoreStart(CompositeRestoreListener.java:59)
>> > > ~[kafka-stream-router.jar:?]
>> > > at
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StateRestorer.restoreStarted(StateRestorer.java:76)
>> > > ~[kafka-stream-router.jar:?]
>> > > at
>> > >
>> >
>> org.apache.kafka.streams.processor.internals.StoreChangelogReader.startRestoration(StoreChangelogReader.java:211)
>> > > ~[kafk

Re: Statestore restoration - Error while range compacting during restoring

2020-04-16 Thread Bruno Cadonna
Hi Nicolas,

Yeah, I meant "doesn't". Sorry for that!

Best,
Bruno

On Thu, Apr 16, 2020 at 1:02 PM Nicolas Carlot
 wrote:
>
> Hi Bruno,
>
> "As far as I understand, the issue is that bulk loading as done in Kafka
> Streams does work as expected if FIFO compaction is used."
>
> You meant "doesn't" right ?
>
> Ok, I will open a ticket, but I don't think my "fix" is the correct one.
> Just ignoring the issue doesn't seem to be a correct solution :)
>
> Le jeu. 16 avr. 2020 à 11:49, Bruno Cadonna  a écrit :
>
> > Hi Nicolas,
> >
> > Thank you for reporting this issue.
> >
> > As far as I understand, the issue is that bulk loading as done in Kafka
> > Streams does work as expected if FIFO compaction is used.
> >
> > I would propose that you open a bug ticket. Please make sure to include
> > steps to reproduce the issue in the ticket. Additionally, since you already
> > have some code that solves the issue, you could assign the ticket to
> > yourself and open a PR for it. Would you be interested in contributing?
> >
> > If you cannot assign the ticket to yourself, ask to be added to the list of
> > contributors on the dev mailing list.
> >
> > Best,
> > Bruno
> >
> > On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot <
> > nicolas.car...@chronopost.fr> wrote:
> >
> > > # Bump #
> > > So is this a bug ? Should I file a ticket ?
> > > Any idea ? I don't like the idea of having to patch Kafka libraries...
> > >
> > > Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot <
> > nicolas.car...@chronopost.fr>
> > > a écrit :
> > >
> > >> Added some nasty code in kafka 2.4.1. Seems to work fine for now... From
> > >> my understanding, the compaction process when restoring a store is only
> > >> done to speed up things.
> > >> So I guess this kind of "hack" isn't such a big deal ?
> > >>
> > >> [image: image.png]
> > >>
> > >> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot <
> > nicolas.car...@chronopost.fr>
> > >> a écrit :
> > >>
> > >>> Here is the full configuration of Rocks.
> > >>>
> > >>> [image: image.png]
> > >>>
> > >>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot <
> > >>> nicolas.car...@chronopost.fr> a écrit :
> > >>>
> > >>>> It's not that I cannot turn on compaction.
> > >>>> Compaction works fine.
> > >>>> The issue is with the restoration process of the state store, which
> > >>>> tries to compact the store to a single level:
> > db.compactRange(columnFamily,
> > >>>> true, 1, 0) before bulk loading the data.
> > >>>> It automatically fails when I'm using FIFO compaction.
> > >>>>
> > >>>>
> > >>>> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot <
> > >>>> nicolas.car...@chronopost.fr> a écrit :
> > >>>>
> > >>>>> My current workaround is to completely delete the state store and
> > >>>>> rebuild it from scratch.
> > >>>>>
> > >>>>> Le mar. 31 mars 2020 à 21:39, Boyang Chen <
> > reluctanthero...@gmail.com>
> > >>>>> a écrit :
> > >>>>>
> > >>>>>> Thanks Nicolas for the report, so are you suggesting that you
> > >>>>>> couldn't turn
> > >>>>>> on compactions for the state store? Is there a workaround?
> > >>>>>>
> > >>>>>> On Tue, Mar 31, 2020 at 9:54 AM Nicolas Carlot <
> > >>>>>> nicolas.car...@chronopost.fr>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>> > After some more testing and debugging, it seems that it is caused
> > >>>>>> by the
> > >>>>>> > compaction option I've configured for RocksDB. When removed
> > >>>>>> everything is
> > >>>>>> > fine...
> > >>>>>> > The option is as follow:
> > >>>>>> >
> > >>>>>> > CompactionOptionsFIFO fifoOptions = new CompactionOptionsFIFO();
> > >>>>>> > fifoOptions.setMaxTableFilesSize(maxSize);
> > >>>>>>

Re: Statestore restoration - Error while range compacting during restoring

2020-04-16 Thread Bruno Cadonna
Thank you, Nicolas!

Bruno

On Thu, Apr 16, 2020 at 2:24 PM Nicolas Carlot
 wrote:
>
> I've opened a Jira issue on the subject
> https://issues.apache.org/jira/browse/KAFKA-9880
>
>
> Le jeu. 16 avr. 2020 à 13:14, Bruno Cadonna  a écrit :
>
> > Hi Nicolas,
> >
> > Yeah, I meant "doesn't". Sorry for that!
> >
> > Best,
> > Bruno
> >
> > On Thu, Apr 16, 2020 at 1:02 PM Nicolas Carlot
> >  wrote:
> > >
> > > Hi Bruno,
> > >
> > > "As far as I understand, the issue is that bulk loading as done in Kafka
> > > Streams does work as expected if FIFO compaction is used."
> > >
> > > You meant "doesn't" right ?
> > >
> > > Ok, I will open a ticket, but I don't think my "fix" is the correct one.
> > > Just ignoring the issue doesn't seem to be a correct solution :)
> > >
> > > Le jeu. 16 avr. 2020 à 11:49, Bruno Cadonna  a
> > écrit :
> > >
> > > > Hi Nicolas,
> > > >
> > > > Thank you for reporting this issue.
> > > >
> > > > As far as I understand, the issue is that bulk loading as done in Kafka
> > > > Streams does work as expected if FIFO compaction is used.
> > > >
> > > > I would propose that you open a bug ticket. Please make sure to include
> > > > steps to reproduce the issue in the ticket. Additionally, since you
> > already
> > > > have some code that solves the issue, you could assign the ticket to
> > > > yourself and open a PR for it. Would you be interested in contributing?
> > > >
> > > > If you cannot assign the ticket to yourself, ask to be added to the
> > list of
> > > > contributors on the dev mailing list.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Thu, Apr 16, 2020 at 11:06 AM Nicolas Carlot <
> > > > nicolas.car...@chronopost.fr> wrote:
> > > >
> > > > > # Bump #
> > > > > So is this a bug ? Should I file a ticket ?
> > > > > Any idea ? I don't like the idea of having to patch Kafka
> > libraries...
> > > > >
> > > > > Le mer. 1 avr. 2020 à 16:33, Nicolas Carlot <
> > > > nicolas.car...@chronopost.fr>
> > > > > a écrit :
> > > > >
> > > > >> Added some nasty code in kafka 2.4.1. Seems to work fine for now...
> > From
> > > > >> my understanding, the compaction process when restoring a store is
> > only
> > > > >> done to speed up things.
> > > > >> So I guess this kind of "hack" isn't such a big deal ?
> > > > >>
> > > > >> [image: image.png]
> > > > >>
> > > > >> Le mer. 1 avr. 2020 à 10:44, Nicolas Carlot <
> > > > nicolas.car...@chronopost.fr>
> > > > >> a écrit :
> > > > >>
> > > > >>> Here is the full configuration of Rocks.
> > > > >>>
> > > > >>> [image: image.png]
> > > > >>>
> > > > >>> Le mer. 1 avr. 2020 à 10:41, Nicolas Carlot <
> > > > >>> nicolas.car...@chronopost.fr> a écrit :
> > > > >>>
> > > > >>>> It's not that I cannot turn on compaction.
> > > > >>>> Compaction works fine.
> > > > >>>> The issue is with the restoration process of the state store,
> > which
> > > > >>>> tries to compact the store to a single level:
> > > > db.compactRange(columnFamily,
> > > > >>>> true, 1, 0) before bulk loading the data.
> > > > >>>> It automatically fails when I'm using FIFO compaction.
> > > > >>>>
> > > > >>>>
> > > > >>>> Le mer. 1 avr. 2020 à 10:26, Nicolas Carlot <
> > > > >>>> nicolas.car...@chronopost.fr> a écrit :
> > > > >>>>
> > > > >>>>> My current workaround is to completely delete the state store and
> > > > >>>>> rebuild it from scratch.
> > > > >>>>>
> > > > >>>>> Le mar. 31 mars 2020 à 21:39, Boyang Chen <
> > > > reluctanthero...@gmail.com>
> > > > >>>>> a écrit :
> > > &g

Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Bruno Cadonna
Hi Raffaele,

In your example, Kafka Streams would send the new and the old value
downstream. More specifically, the groupBy() would send (as you also
observed)

London, (old value: London, new value: null)
Berlin, (old value: null, new value: Berlin)

At the count() record London, (old value: London, new value: null)
would detract 1 from key London and record Berlin, (old value: null,
new value: Berlin) would add 1 to Berlin.

The record structure key, (oldValue, newValue) is called Change in
Kafka Streams and it is used where updates are emitted downstream.

Best,
Bruno

On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
 wrote:
>
> I m trying to better understand KTable and I have encountered a behaviour I
> cannot wrap my mind around it.
>
> So* groupByKey()* can only be applied to KStream and not to KTable, that's
> because of the nature of KTable that and its UPSERT logic.
> What I don't understand correctly and therefore ask your help for that is
> how *groupBy()* can actually be applied on KTable, the documentation says
> that:
>
> groupBy() is a shorthand for selectKey(...).groupByKey()
>
> But both these operations can only be applied to KStreams.
>
> The documentation also says:
>
> Because a new key is selected, an internal repartitioning topic will be
> created in Kafka ... All data of this KTable will be redistributed through
> the repartitioning topic by writing all update records to and rereading all
> update records from it, such that the resulting KGroupedTable is
> partitioned on the new key.
>
> Now assume we want to count the favourite cities of users:
>
> We have a key table like:
>
> Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
>
> I would use:
>
> KTable usersAndCitiesTable =
> builder.table("user-keys-and-cities");
>
>  KTable favouriteCities =
> usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
>  .count(Materialized. byte[]>>as("CountsByCity")
>
> I took a look at the repartitioning topic created because of the groupBy,
> and can see the record mapped using the KeyValueMapper provided
>
> I noticed that table generates two entries when Mike changes his mind, one
> for London (the old city) and one for Berlin (the new city)
>
> Are this entries marked somehow?  if yes, how ?
>
> How does Kafka make sure that on London count is applied a -1 and the
> Berlin count a +1 when the new record with Mike's new favorite city arrives.
>
>
> Any help or suggestion is highly appreciated !
>
> Thanks


Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Bruno Cadonna
Hi Raffaele,

Change is an internal class in Streams and also its SerDes are
internal. To consume the repartition topic you mention outside of
Streams you would need to use those internal classes (note: I've never
tried this). Those classes can change at any time. So consuming from
repartition topics for other than educational purposes is not a good
idea.

toStream() only emits the new value of the Change.

Regarding docs, since these are internals, the code is the best
source. For example:

The Change class:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java

Here the Change class is used to first remove the old value from the
aggregate and then to add the new value to the aggregate:
https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90

Best,
Bruno

On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito
 wrote:
>
> Hi Bruno,
> Also when you mention:
>
> The record structure key, (oldValue, newValue) is called Change in
> Kafka Streams and it is used where updates are emitted downstream
>
> Does it also mean the same happen when we convert a KTable to a KStream ?
> Do you know any docs or article about this topics?
>
> Thanks again,
> Raffaele
>
>
>
> On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito 
> wrote:
>
> > Hi Bruno,
> > Thanks,
> > One more thing, As I told you I was consuming the repartitioning topic
> > created by group by
> > and I just saw the old and new value, as you are telling me now they are
> > indeed marked as old and new,
> > is this mark visible somehow consuming the repartitioning topic ?
> > Raffaele
> >
> > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna  wrote:
> >
> >> Hi Raffaele,
> >>
> >> In your example, Kafka Streams would send the new and the old value
> >> downstream. More specifically, the groupBy() would send (as you also
> >> observed)
> >>
> >> London, (old value: London, new value: null)
> >> Berlin, (old value: null, new value: Berlin)
> >>
> >> At the count() record London, (old value: London, new value: null)
> >> would detract 1 from key London and record Berlin, (old value: null,
> >> new value: Berlin) would add 1 to Berlin.
> >>
> >> The record structure key, (oldValue, newValue) is called Change in
> >> Kafka Streams and it is used where updates are emitted downstream.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> >>  wrote:
> >> >
> >> > I m trying to better understand KTable and I have encountered a
> >> behaviour I
> >> > cannot wrap my mind around it.
> >> >
> >> > So* groupByKey()* can only be applied to KStream and not to KTable,
> >> that's
> >> > because of the nature of KTable that and its UPSERT logic.
> >> > What I don't understand correctly and therefore ask your help for that
> >> is
> >> > how *groupBy()* can actually be applied on KTable, the documentation
> >> says
> >> > that:
> >> >
> >> > groupBy() is a shorthand for selectKey(...).groupByKey()
> >> >
> >> > But both these operations can only be applied to KStreams.
> >> >
> >> > The documentation also says:
> >> >
> >> > Because a new key is selected, an internal repartitioning topic will be
> >> > created in Kafka ... All data of this KTable will be redistributed
> >> through
> >> > the repartitioning topic by writing all update records to and rereading
> >> all
> >> > update records from it, such that the resulting KGroupedTable is
> >> > partitioned on the new key.
> >> >
> >> > Now assume we want to count the favourite cities of users:
> >> >
> >> > We have a key table like:
> >> >
> >> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> >> >
> >> > I would use:
> >> >
> >> > KTable usersAndCitiesTable =
> >> > builder.table("user-keys-and-cities");
> >> >
> >> >  KTable favouriteCities =
> >> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> >> >  .count(Materialized. >> > byte[]>>as("CountsByCity")
> >> >
> >> > I took a look at the repartitioning topic created because of the
> >> groupBy,
> >> > and can see the record mapped using the KeyValueMapper provided
> >> >
> >> > I noticed that table generates two entries when Mike changes his mind,
> >> one
> >> > for London (the old city) and one for Berlin (the new city)
> >> >
> >> > Are this entries marked somehow?  if yes, how ?
> >> >
> >> > How does Kafka make sure that on London count is applied a -1 and the
> >> > Berlin count a +1 when the new record with Mike's new favorite city
> >> arrives.
> >> >
> >> >
> >> > Any help or suggestion is highly appreciated !
> >> >
> >> > Thanks
> >>
> >


Re: KafkaStream groupBy + count on KTable behaviour

2020-05-15 Thread Bruno Cadonna
Hi Raffaele,

I would put it differently. Repartition is a concept, but the
repartition topics are an implementation detail. Streams could
repartition data using network shufflling instead of Kafka topics, for
example. Since it is an implementation detail it should not be exposed
publicly, similarly to a private method in a Java class.

Best,
Bruno

On Thu, May 14, 2020 at 9:41 PM Raffaele Esposito
 wrote:
>
> Thanks a lot Bruno, much clearer now.
> It's only my opinion but since the Topology is a concept of the API as
> well as the repartitioning logic, for me also this mechanism should be a
> bit more transparent, but it aslo maybe that I'm plain wrong here :)
>
> Thanks !
>
> On Thu, May 14, 2020 at 9:24 PM Bruno Cadonna  wrote:
>
> > Hi Raffaele,
> >
> > Change is an internal class in Streams and also its SerDes are
> > internal. To consume the repartition topic you mention outside of
> > Streams you would need to use those internal classes (note: I've never
> > tried this). Those classes can change at any time. So consuming from
> > repartition topics for other than educational purposes is not a good
> > idea.
> >
> > toStream() only emits the new value of the Change.
> >
> > Regarding docs, since these are internals, the code is the best
> > source. For example:
> >
> > The Change class:
> >
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
> >
> > Here the Change class is used to first remove the old value from the
> > aggregate and then to add the new value to the aggregate:
> >
> > https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90
> >
> > Best,
> > Bruno
> >
> > On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito
> >  wrote:
> > >
> > > Hi Bruno,
> > > Also when you mention:
> > >
> > > The record structure key, (oldValue, newValue) is called Change in
> > > Kafka Streams and it is used where updates are emitted downstream
> > >
> > > Does it also mean the same happen when we convert a KTable to a KStream ?
> > > Do you know any docs or article about this topics?
> > >
> > > Thanks again,
> > > Raffaele
> > >
> > >
> > >
> > > On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito <
> > rafaelral...@gmail.com>
> > > wrote:
> > >
> > > > Hi Bruno,
> > > > Thanks,
> > > > One more thing, As I told you I was consuming the repartitioning topic
> > > > created by group by
> > > > and I just saw the old and new value, as you are telling me now they
> > are
> > > > indeed marked as old and new,
> > > > is this mark visible somehow consuming the repartitioning topic ?
> > > > Raffaele
> > > >
> > > > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna 
> > wrote:
> > > >
> > > >> Hi Raffaele,
> > > >>
> > > >> In your example, Kafka Streams would send the new and the old value
> > > >> downstream. More specifically, the groupBy() would send (as you also
> > > >> observed)
> > > >>
> > > >> London, (old value: London, new value: null)
> > > >> Berlin, (old value: null, new value: Berlin)
> > > >>
> > > >> At the count() record London, (old value: London, new value: null)
> > > >> would detract 1 from key London and record Berlin, (old value: null,
> > > >> new value: Berlin) would add 1 to Berlin.
> > > >>
> > > >> The record structure key, (oldValue, newValue) is called Change in
> > > >> Kafka Streams and it is used where updates are emitted downstream.
> > > >>
> > > >> Best,
> > > >> Bruno
> > > >>
> > > >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> > > >>  wrote:
> > > >> >
> > > >> > I m trying to better understand KTable and I have encountered a
> > > >> behaviour I
> > > >> > cannot wrap my mind around it.
> > > >> >
> > > >> > So* groupByKey()* can only be applied to KStream and not to KTable,
> > > >> that's
> > > >> > because of the nature of KTable that and its UPSERT logic.
> > > >> > What I don't unde

Re: About Kafka Stream parallelism

2020-05-15 Thread Bruno Cadonna
Hi Rapeepat,

1. The parallelism of Kafka Streams does not only depend on the number
of partitions of the input topic. It also depends on the structure of
your topology. Your example topology  topicA => transform1 => topicB
=> transform2 => topicC would be subdivided in two subtopologies:
- subtopology 1: topicA => transform1 => topicB
- subtopology 2: topicB => transform2 => topicC
For each combination of a subtopology and partition a task is created.
Tasks are distributed across Kafka Streams instances, more precisely
across stream threads on the instances (see also
https://kafka.apache.org/25/documentation/streams/architecture).

If we assume one partition, your topology would result in two tasks
and assuming that you have one stream thread per instance your
topology should run on two Kafka Streams instances.

2. Yes, your example would result in three subtopologies and they
should run on three Kafka Streams  instances with a stream thread
each.

One drawback of having multiple pipelines in the same Kafka Streams
application is that you cannot configure and scale them independently.

Best,
Bruno

On Fri, May 15, 2020 at 1:46 PM Rapeepat (Lookmoo) Sriwichai
 wrote:
>
> Dear Kafka,
>
> Hi there. I have a question about Kafka Stream parallelism.
> I know that Kafka Stream parallelism is based on consumer group.
> Like, if you have 3 partitions source topic you can have maximum 3 consumer 
> instances (or 3 kafka stream instances) at max that will work concurrently.
> I have 2 scenarios about it and would like to know how it will work?
>
>   1.  Assume I have a stream pipeline that read from topicA process save to 
> topicB read it back (using through method) process with another logic and 
> save to topicC
> topicA => transform1 => topicB => transform2 => topicC
> Supposed that all topics have only 1 partition(for simplicity), Can I have 2 
> instances of Kafka Stream and expected each one of them to do the transform1 
> and transform2 balancely ?
>   2.  Is it a good idea to have multiple streaming pipelines in same Kafka 
> Stream application ?
> Like
> topicA1 =>  transformA => topicA2
> topicB1 =>  transformB => topicB2
> topicC1 =>  transformC => topicC2
> and like previous one. assume each topic have only 1 partition, can I have 3 
> Kafka Stream instances and expect each one will take 1 task of each stream 
> pipeline ?
>
>
> Regards,
> Rapeepat
>


Re: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Bruno Cadonna
Hi Georg,

>From your description, I do not see why you need to use a global state
instead of a local one. Are there any specific reasons for that? With
a local state store you would have the previous record immediately
available.

Best,
Bruno

On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17)
 wrote:
>
> Good morning,
>
> I have setup a Kafka Streams application with the following logic. The 
> incoming messages are validated and transformed. The transformed messages are 
> then published to a global state store via topic A as well as to an 
> additional topic A for consumption by other applications further down the 
> processing pipeline.
>
> As part of the transformation I access the global state store in order to get 
> the values from the previous message and use them in the transformation of 
> the current message. The messages only contain changed values and these 
> changes are merged with the complete data set before being sent on, hence I 
> always hold the latest state in the global store in order to merge it with 
> the incoming changed values.
>
> Unfortunately, when I access the store in the transformation I do not get the 
> latest state. The update of the store takes too long so when I access it in 
> the transformation I either get no values or values which do not represent 
> the latest state.
>
> The following shows the build-up of my streams app:
>
> //setup global state store
> final KeyValueBytesStoreSupplier storeSupplier = 
> Stores.persistentKeyValueStore( “global-store” );
> final StoreBuilder> storeBuilder = 
> Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new 
> JSONObjectSerde() );
> builder.addGlobalStore( storeBuilder, “global-store-topic”,  Consumed.with( 
> Serdes.String(), new JSONObjectSerde() ), StoreProcessor::new );
>
> //store processor
>
> private KeyValueStore stateStore;
>
> @Override
> public void init( final ProcessorContext context ) {
>stateStore = (KeyValueStore) context.getStateStore( 
> “global-store” );
> }
>
>
>
> @Override
> public void process( final String key, final JSONObject state ) {
>log.info( "Update state store for {}: {}.", key, state );
>lastRecentStateStore.put( key, state );
> }
>
>
> //streams setup
>
> final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
>
> final KStream stream = builder.stream( “input-topic”, 
> Consumed.with( Serdes.String(), jsonObjectSerde ) )
>
>.transformValues( ValueTransformer::new )
>
>
>
> stream.to( “global-store-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> stream.to( “output-topic”, Produced.valueSerde( jsonObjectSerde ) );
>
> //global state store access in ValueTransformer
>
> JSONObject previousState = Optional.ofNullable( stateStore.get( key ) )
>.orElse( new JSONObject() );
>
>
> I have set the acknowledge property for the producers to “all”.
>
> I have tried to disable the caching by setting “cache.max.bytes.buffering” to 
> 0 and by disabling the cache on the store using “.withCachingDisabled()”. I 
> also tried setting the commit interval to 0. All without success.
>
> How can I setup a global state which meets the requirements as describe in 
> the scenario above?
>
> Thank you!
>
> Best regards / Mit freundlichen Grüßen / Üdvözlettel / 致以诚挚的问候
>
> Mr. Georg Schmidt-Dumont
> Bosch Connected Industry – BCI/ESW17
> Robert Bosch GmbH | Postfach 10 60 50 | 70049 Stuttgart | GERMANY | 
> www.bosch.com
> Phone +49 711 811-49893  | 
> georg.schmidt-dum...@bosch.com
>
> Sitz: Stuttgart, Registergericht: Amtsgericht Stuttgart, HRB 14000;
> Aufsichtsratsvorsitzender: Franz Fehrenbach; Geschäftsführung: Dr. Volkmar 
> Denner,
> Prof. Dr. Stefan Asenkerschbaumer, Dr. Rolf Bulander, Dr. Stefan Hartung, Dr. 
> Markus Heyn, Dr. Dirk Hoheisel,
> Christoph Kübel, Uwe Raschke, Peter Tyroller
>


Re: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Bruno Cadonna
Hi Georg,

local state stores in Kafka Streams are backed by a Kafka topic by
default. So, if the instance crashes the local state store is restored
from the local state directory. If the local state directory is empty
or does not exist the local state store is restored from the Kafka
topic. Local state stores are as resilient as global state stores.

As far as I understand, you only look up previous records with the
same key. You do not need to have the global state available at each
instance to do this. Having available all records with the same key is
sufficient. If your input topic are partitioned by key then records
with the same key will land on the same instance. That means, your
local state store contains all records with the same key.

Best,
Bruno

On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17)
 wrote:
>
> Hi Bruno,
>
> Thanks for your quick reply!
>
> I decided to use a global state store for two reasons. If the application 
> crashes, the store is populated properly once the reason for the crash has 
> been fixed and the app starts again, i.e. I feel that it gives me a certain 
> resiliency. Second we will be running multiple instances of the application 
> and using a global state store provides the state across all instances.
>
> I am fairly new to Kafka and Kafka Streams, I am very much open to 
> suggestions on better ways to handle the flow I need.
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -Ursprüngliche Nachricht-
> Von: Bruno Cadonna 
> Gesendet: Dienstag, 19. Mai 2020 10:52
> An: Users 
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> From your description, I do not see why you need to use a global state 
> instead of a local one. Are there any specific reasons for that? With a local 
> state store you would have the previous record immediately available.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
>  wrote:
> >
> > Good morning,
> >
> > I have setup a Kafka Streams application with the following logic. The 
> > incoming messages are validated and transformed. The transformed messages 
> > are then published to a global state store via topic A as well as to an 
> > additional topic A for consumption by other applications further down the 
> > processing pipeline.
> >
> > As part of the transformation I access the global state store in order to 
> > get the values from the previous message and use them in the transformation 
> > of the current message. The messages only contain changed values and these 
> > changes are merged with the complete data set before being sent on, hence I 
> > always hold the latest state in the global store in order to merge it with 
> > the incoming changed values.
> >
> > Unfortunately, when I access the store in the transformation I do not get 
> > the latest state. The update of the store takes too long so when I access 
> > it in the transformation I either get no values or values which do not 
> > represent the latest state.
> >
> > The following shows the build-up of my streams app:
> >
> > //setup global state store
> > final KeyValueBytesStoreSupplier storeSupplier =
> > Stores.persistentKeyValueStore( “global-store” ); final
> > StoreBuilder> storeBuilder =
> > Stores.keyValueStoreBuilder( storeSupplier, Serdes.String(), new
> > JSONObjectSerde() ); builder.addGlobalStore( storeBuilder,
> > “global-store-topic”,  Consumed.with( Serdes.String(), new
> > JSONObjectSerde() ), StoreProcessor::new );
> >
> > //store processor
> >
> > private KeyValueStore stateStore;
> >
> > @Override
> > public void init( final ProcessorContext context ) {
> >stateStore = (KeyValueStore)
> > context.getStateStore( “global-store” ); }
> >
> >
> >
> > @Override
> > public void process( final String key, final JSONObject state ) {
> >log.info( "Update state store for {}: {}.", key, state );
> >lastRecentStateStore.put( key, state ); }
> >
> >
> > //streams setup
> >
> > final JSONObjectSerde jsonObjectSerde = new JSONObjectSerde();
> >
> > final KStream stream = builder.stream( “input-topic”, 
> > Consumed.with( Serdes.String(), jsonObjectSerde ) )
> >
> >.transformValues( ValueTransformer::new )
> >
> >
> >
> > stream.to( “global-store-topic”, Produced.valueSerde( jsonObje

Re: Question regarding Kafka Streams Global State Store

2020-05-19 Thread Bruno Cadonna
Hi Georg,

Great that you could answer your own question and I am glad that I could help.

I was just writing you a similar answer. Yes, the global state store
will eventually reflect your write but you do not know when. That is
the main issue for your use case. A local state store will immediately
contain your previous write, because it is local to your processing.

For more information on the global state store see
https://kafka.apache.org/25/documentation/streams/developer-guide/dsl-api.html#streams_concepts_globalktable

Best,
Bruno

On Tue, May 19, 2020 at 3:04 PM Schmidt-Dumont Georg (BCI/ESW17)
 wrote:
>
> Hi Bruno,
>
> I just had a discussion with a colleague of mine regarding this and I wanted 
> to give you a quick contextual update. With regards to the global state, I 
> realize that having this state consistent in a distributed system is very 
> difficult. My expectation was that since it is a global state, Kafka takes 
> care of the consistency and I can just access the data. I think my 
> expectation was a bit naïve. The state will probably be eventually 
> consistent. But this does not fit with what I am trying to do. As you said I 
> should use a local store.
>
> With regards to the question in my previous mail with the amount of 
> partitions. I think I have answered my own question. Ensuring that the 
> messages have the correct and consistent keys will see to it that all the 
> data for a specific key ends up in a single partition. It does not mean that 
> a partition per key is required (which I first thought).
>
> Thanks again for your help!
>
> Mit freundlichen Grüßen / Best regards
>
> Georg Schmidt-Dumont
> BCI/ESW17
> Bosch Connected Industry
>
> Tel. +49 711 811-49893
>
> ► Take a look: https://bgn.bosch.com/alias/bci
>
>
>
> -Ursprüngliche Nachricht-
> Von: Bruno Cadonna 
> Gesendet: Dienstag, 19. Mai 2020 11:42
> An: Users 
> Betreff: Re: Question regarding Kafka Streams Global State Store
>
> Hi Georg,
>
> local state stores in Kafka Streams are backed by a Kafka topic by default. 
> So, if the instance crashes the local state store is restored from the local 
> state directory. If the local state directory is empty or does not exist the 
> local state store is restored from the Kafka topic. Local state stores are as 
> resilient as global state stores.
>
> As far as I understand, you only look up previous records with the same key. 
> You do not need to have the global state available at each instance to do 
> this. Having available all records with the same key is sufficient. If your 
> input topic are partitioned by key then records with the same key will land 
> on the same instance. That means, your local state store contains all records 
> with the same key.
>
> Best,
> Bruno
>
> On Tue, May 19, 2020 at 11:05 AM Schmidt-Dumont Georg (BCI/ESW17) 
>  wrote:
> >
> > Hi Bruno,
> >
> > Thanks for your quick reply!
> >
> > I decided to use a global state store for two reasons. If the application 
> > crashes, the store is populated properly once the reason for the crash has 
> > been fixed and the app starts again, i.e. I feel that it gives me a certain 
> > resiliency. Second we will be running multiple instances of the application 
> > and using a global state store provides the state across all instances.
> >
> > I am fairly new to Kafka and Kafka Streams, I am very much open to 
> > suggestions on better ways to handle the flow I need.
> >
> > Mit freundlichen Grüßen / Best regards
> >
> > Georg Schmidt-Dumont
> > BCI/ESW17
> > Bosch Connected Industry
> >
> > Tel. +49 711 811-49893
> >
> > ► Take a look: https://bgn.bosch.com/alias/bci
> >
> >
> >
> > -Ursprüngliche Nachricht-
> > Von: Bruno Cadonna 
> > Gesendet: Dienstag, 19. Mai 2020 10:52
> > An: Users 
> > Betreff: Re: Question regarding Kafka Streams Global State Store
> >
> > Hi Georg,
> >
> > From your description, I do not see why you need to use a global state 
> > instead of a local one. Are there any specific reasons for that? With a 
> > local state store you would have the previous record immediately available.
> >
> > Best,
> > Bruno
> >
> > On Tue, May 19, 2020 at 10:23 AM Schmidt-Dumont Georg (BCI/ESW17) 
> >  wrote:
> > >
> > > Good morning,
> > >
> > > I have setup a Kafka Streams application with the following logic. The 
> > > incoming messages are validated and transformed. The transformed messages 
> > > are then published to a global state store via topic A as well as to an 
> > > additional topi

Re: Sharing of State Stores

2020-07-30 Thread Bruno Cadonna

Hi Charles,

Two transformers that share the same state store should end up into the 
same sub-topology. A sub-topology is executed by as many task as the 
number of partitions of the input topics. Each task processes the 
records from one input partition group (i.e. the same partition from 
both input topics in your case). A task is assigned to one single stream 
thread on a Kafka Streams client. Each stream thread is a member of the 
consumer group.


1) As far as I understand your setup, same keys are produced to one 
partition. A given key will end up in the same partition in both of your 
input topics. Hence, the key will be processed by the same task that 
executes the sub-topology that contains both transformers.


2) Since the execution of a task is single threaded, the transformers 
will access the state consecutively and see the updated state store. 
Streams tries to process records from both partitions in time order, but 
this is best-effort and not guaranteed (see max.task.idle.ms). The time 
order depends on the timestamp extractor you use.


To check if both transformers are in the same sub-topology you can call 
topology.describe().toString(). To visualize the topology you can use 
https://zz85.github.io/kafka-streams-viz/.


Best,
Bruno

On 30.07.20 00:00, Charles Devilliers wrote:

Hello,

I have some rudimentary questions on state stores. My service is planned to
have two transformers, each listening to a different topic. Both topics
have the same number of partitions and the upstream producers to those
topics are consistent with respect to key schema. My question centers
around the fact that both transformers need to consult and update the same
persistent state store in order to make decisions with respect to record
processing. I am not implementing a custom key partitioner, I'm using the
default. Also, there is no re-keying done by either transformer.

Given the above scenario, I have the following questions:

1) Will a given key always hash to the same kstream consumer group member
for both transformers? You can imagine why this is important given that
they share a state store. My concern is that rebalancing may occur, and
somehow the key space for one of the transformers is moved to another pod,
but not both.

2) If transformer A processes a record R for a given key K, and the shared
state store is updated at key K as a result of that processing, does the
second transformer B have access to the updated state store value as soon
as transformer A is done processing the record? (Assume the record is
updated with a state store put()).

I have been told that in order to ensure that the partition assignments are
consistent across pods, for both input topics, I have to do some exotic
merging of the kstreams that process the input topics, which feels strange
and wrong.

Are there any other constraints or considerations that involve sharing a
state store across transformers that I should be thinking about in my
architecture for this service, but didn't mention?

Thanks for clarifying.



Re: Kafka streams sink outputs weird records

2020-08-20 Thread Bruno Cadonna

Hi Pirow,

hard to to have an idea without seeing the code that is executed in the 
processors.


Could you please post a minimal example that reproduces the issue?

Best,
Bruno

On 20.08.20 14:53, Pirow Engelbrecht wrote:

Hello,

I’ve got Kafka Streams up and running with the following topology:

Sub-topology: 0

     Source: TopicInput (topics: [inputTopic])

   --> InputProcessor

     Processor: InputProcessor (stores: [KvStore])

   --> TopicOutput

   <-- TopicInput

     Source: KvInput (topics: [kvStoreTopic])

   --> KvProcessor

     Processor: KvProcessor (stores: [KvStore])

   --> none

   <-- KvInput

     Sink: TopicOutput (topic: outputTopic)

   <-- InputProcessor

There is only one context().forward(key,value) call from the 
InputProcessor to the TopicOutput sink. For some reason I get one 
additional Kafka record for each record the TopicOutput sinks puts into 
the output topic. They look like this:


ConsumerRecord(topic='outputTopic', partition=0, offset=1, 
timestamp=1597926832492, timestamp_type=0, key=b'\x00\x00\x00\x01', 
value=b'\x00\x00\x00\x00\x00\x00', headers=[], checksum=None, 
serialized_key_size=4, serialized_value_size=6, serialized_header_size=-1)


With a binary key and value (key and value seems to always be the same). 
Any ideas?


Thanks

*Pirow Engelbrecht*
System Engineer

*E.*pirow.engelbre...@etion.co.za 


*T.* +27 12 678 9740 (ext. 9879)
*M.*+27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157 


*www.etion.co.za *



Facebook 
 | 
YouTube  | 
LinkedIn  | Twitter 
 | Instagram 





Re: Handle exception in kafka stream

2020-09-01 Thread Bruno Cadonna

Hi Deepak,

Do you return DeserializationHandlerResponse.CONTINUE or 
DeserializationHandlerResponse.FAIL in your CustomExceptionHandler?


With DeserializationHandlerResponse.CONTINUE, the processing of records 
should not stop and after the next offset commit the bad records should 
not be read anymore from the input if your application restarts (except 
if you reset your application). That does not guarantee that duplicate 
bad records get appended to your file, though. But it might reduce the 
duplicates.


See also the following link for an example of returning 
DeserializationHandlerResponse.CONTINUE:

https://kafka.apache.org/26/documentation/streams/developer-guide/config-streams.html#default-deserialization-exception-handler

Best,
Bruno

On 01.09.20 10:14, Deepak Raghav wrote:

Hi Team

I have created a CustomExceptionHandler class by
implementing DeserializationExceptionHandler interface to handle the
exception during deserialization time.

But the problem with this approach is that if there is some exception
raised for some record and after that stream is stopped and
restarted again, it reads those bad messages again.

I am storing those bad messages in some file in the filesystem and with
this approach, duplicate messages are getting appended in the file when the
stream is started since those bad message's offset are not getting
increased.

Please let me know if I missed anything.

Regards and Thanks
Deepak Raghav



Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Bruno Cadonna

Hi Pushkar,

Is the error you are talking about, one that is thrown by Kafka Streams 
or by your application? If it is thrown by Kafka Streams, could you 
please post the error?


I do not completely understand what you are trying to achieve, but maybe 
max.task.idle.ms [1] is the configuration you are looking for.


I can assure you that enable.auto.commit is false in Kafka Streams. What 
you probably mean is that Kafka Streams periodically commits the 
offsets. The commit interval can be controlled with commit.interval.ms 
[2].



Best,
Bruno


[1] https://kafka.apache.org/documentation/#max.task.idle.ms
[2] https://kafka.apache.org/documentation/#commit.interval.ms

On 21.09.20 12:38, Pushkar Deole wrote:

Hi,

I would like to know how to handle following scenarios while processing
events in a kafka streams application:

1. the streams application needs data from a globalKtable which loads it
from a topic that is populated by some other service/application. So, if
the streams application starts getting events from input source topic
however it doesn't find required data in GlobalKTable since that other
application/service hasn't yet loaded that data then the Kafka streams
application gets error while processing the event and application handles
the exception by logging  an error and it goes onto processing other
events. Since auto.commit is true, the polling will go on fetching next
batch and probably it will set the offset of previous batch, causing loss
of events that had an exception while processing.

I want to halt the processing here if an error occurs while processing the
event, so instead of going on to the next event, the processing should keep
trying previous event until application level error is resolved. How can I
achieve this?



Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Bruno Cadonna

Thank you for clarifying! Now, I think I understand.

You could put events for which required data in the global table is not 
available into a state store and each time an event from the input topic 
is processed, you could lookup all events in your state store and see if 
required data is now available for them.


However, be aware that this can mix up the original order of the events 
in your input topic if required data of later events is available before 
required data of earlier events. Furthermore, you need to consider the 
case when you have a huge amount of events in the state store and 
suddenly all required data in the global table is available, because 
processing all those events at once might lead to exceeding 
max.poll.interval.ms and the stream thread might be kicked out of the 
consumer group. To solve that, you may want to limit the number of 
events processed at once. You also need to avoid the state store growing 
indefinitely if required data in the global table is not available for a 
long time or not available at all. Maybe all this caveats do not apply 
to your use case.


Best,
Bruno


On 21.09.20 13:45, Pushkar Deole wrote:

Say the application level exception is named as :
MeasureDefinitionNotAvaialbleException

What I am trying to achieve is: in above case when the event processing
fails due to required data not available, the streams should not proceed on
to next event, however it should wait for the processing of current event
to complete. If I just catch the MeasureDefinitionNotAvaialbleException in
processor and log it then the stream will proceed onto next event
considering the current event processing got successful right?

On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole  wrote:


It is not a kafka streams error, it is an application level error e.g.
say, some data required for processing an input event is not available in
the GlobalKTable since it is not yet synced with the global topic

On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna  wrote:


Hi Pushkar,

Is the error you are talking about, one that is thrown by Kafka Streams
or by your application? If it is thrown by Kafka Streams, could you
please post the error?

I do not completely understand what you are trying to achieve, but maybe
max.task.idle.ms [1] is the configuration you are looking for.

I can assure you that enable.auto.commit is false in Kafka Streams. What
you probably mean is that Kafka Streams periodically commits the
offsets. The commit interval can be controlled with commit.interval.ms
[2].


Best,
Bruno


[1] https://kafka.apache.org/documentation/#max.task.idle.ms
[2] https://kafka.apache.org/documentation/#commit.interval.ms

On 21.09.20 12:38, Pushkar Deole wrote:

Hi,

I would like to know how to handle following scenarios while processing
events in a kafka streams application:

1. the streams application needs data from a globalKtable which loads it
from a topic that is populated by some other service/application. So, if
the streams application starts getting events from input source topic
however it doesn't find required data in GlobalKTable since that other
application/service hasn't yet loaded that data then the Kafka streams
application gets error while processing the event and application

handles

the exception by logging  an error and it goes onto processing other
events. Since auto.commit is true, the polling will go on fetching next
batch and probably it will set the offset of previous batch, causing

loss

of events that had an exception while processing.

I want to halt the processing here if an error occurs while processing

the

event, so instead of going on to the next event, the processing should

keep

trying previous event until application level error is resolved. How

can I

achieve this?









Re: kafka schema registry - some queries and questions

2020-09-21 Thread Bruno Cadonna

Hi Pushkar,

This question is better suited for 
https://groups.google.com/g/confluent-platform since the Schema Registry 
is part of the Confluent Platform but not of Apache Kafka.


Best,
Bruno

On 21.09.20 16:58, Pushkar Deole wrote:

Hi All,

Wanted to understand a bit more on the schema registry provided by
confluent.
Following are the queries:
1. Is the schema registry provided by confluent over the top of Apache
Kafka?
2. If a managed kafka service is used in cloud e.g. say Aiven Kafka, then
does the schema registry implementation is different for different vendors
i.e. will Aiven has their own implementation or confluent has open sourced
the schema registry implementation?
3. Does the confluent Avro client libraries will work with schema registry
of managed services from other vendors like Aiven?



Re: Kafka streams - how to handle application level exception in event processing

2020-09-21 Thread Bruno Cadonna

Hi Pushkar,

If you want to keep the order, you could still use the state store I 
suggested in my previous e-mail and implement a queue on top of it. For 
that you need to put the events into the store with a key that 
represents the arrival order of the events. Each time a record is 
received from the input topic, the events are read in arrival order from 
the state store and the data in the global table is probed. If an event 
matches data from the global table the event is removed from the state 
store and emitted. If an event does not match data from the global table 
the processing is stopped and nothing is emitted.


Best,
Bruno

On 21.09.20 14:21, Pushkar Deole wrote:

Bruno,

1. the loading of topic mapped to GlobalKTable is done by some other
service/application so when my application starts up, it will just sync a
GlobalKTable against that topic and if that other service/application is
still starting up then it may not have loaded that data on that topic and
that's the reason it is not available to my application through the
GlobalKTable.

2. I don't want out of order processing to happen, that's the reason I want
my streams application to keep trying same event until the other
application starts up and required data becomes available in globalKtable


On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna  wrote:


Thank you for clarifying! Now, I think I understand.

You could put events for which required data in the global table is not
available into a state store and each time an event from the input topic
is processed, you could lookup all events in your state store and see if
required data is now available for them.

However, be aware that this can mix up the original order of the events
in your input topic if required data of later events is available before
required data of earlier events. Furthermore, you need to consider the
case when you have a huge amount of events in the state store and
suddenly all required data in the global table is available, because
processing all those events at once might lead to exceeding
max.poll.interval.ms and the stream thread might be kicked out of the
consumer group. To solve that, you may want to limit the number of
events processed at once. You also need to avoid the state store growing
indefinitely if required data in the global table is not available for a
long time or not available at all. Maybe all this caveats do not apply
to your use case.

Best,
Bruno


On 21.09.20 13:45, Pushkar Deole wrote:

Say the application level exception is named as :
MeasureDefinitionNotAvaialbleException

What I am trying to achieve is: in above case when the event processing
fails due to required data not available, the streams should not proceed

on

to next event, however it should wait for the processing of current event
to complete. If I just catch the MeasureDefinitionNotAvaialbleException

in

processor and log it then the stream will proceed onto next event
considering the current event processing got successful right?

On Mon, Sep 21, 2020 at 5:11 PM Pushkar Deole 

wrote:



It is not a kafka streams error, it is an application level error e.g.
say, some data required for processing an input event is not available

in

the GlobalKTable since it is not yet synced with the global topic

On Mon, Sep 21, 2020 at 4:54 PM Bruno Cadonna 

wrote:



Hi Pushkar,

Is the error you are talking about, one that is thrown by Kafka Streams
or by your application? If it is thrown by Kafka Streams, could you
please post the error?

I do not completely understand what you are trying to achieve, but

maybe

max.task.idle.ms [1] is the configuration you are looking for.

I can assure you that enable.auto.commit is false in Kafka Streams.

What

you probably mean is that Kafka Streams periodically commits the
offsets. The commit interval can be controlled with commit.interval.ms
[2].


Best,
Bruno


[1] https://kafka.apache.org/documentation/#max.task.idle.ms
[2] https://kafka.apache.org/documentation/#commit.interval.ms

On 21.09.20 12:38, Pushkar Deole wrote:

Hi,

I would like to know how to handle following scenarios while

processing

events in a kafka streams application:

1. the streams application needs data from a globalKtable which loads

it

from a topic that is populated by some other service/application. So,

if

the streams application starts getting events from input source topic
however it doesn't find required data in GlobalKTable since that other
application/service hasn't yet loaded that data then the Kafka streams
application gets error while processing the event and application

handles

the exception by logging  an error and it goes onto processing other
events. Since auto.commit is true, the polling will go on fetching

next

batch and probably it will set the offset of previous batch, causing

loss

of events that had an exception while processing.

I want to halt the processing here if an error occurs while processing

the

event, 

Re: Kafka streams - how to handle application level exception in event processing

2020-09-22 Thread Bruno Cadonna

Hi Pushkar,

I think there is a misunderstanding. If a consumer polls from a 
partition, it will always poll the next event independently whether the 
offset was committed or not. Committed offsets are used for fault 
tolerance, i.e., when a consumer crashes, the consumer that takes over 
the work of the crashed consumer will start polling record from the 
offset the crashed consumer committed last. This is not only true for 
Kafka Streams, but for all applications that use a Kafka consumer with 
subscription.


To be clear, my proposal is not a workaround. This is one approach to 
solve your problem in Kafka Streams. You could have a look into 
stream-stream joins if you can use a stream instead of a global table. 
Another approach would be to use a plain Kafka consumer instead of Kafka 
Stream with which you would have a more fine-grained control about polls 
and commits. In any case, be aware that blocking processing on an event 
indefinitely may result in your lag and/or your state growing 
indefinitely.


If you think there is something missing in Kafka Streams, you are very 
welcome to search through the tickets in 
https://issues.apache.org/jira/projects/KAFKA/issues and comment on 
tickets that would solve your issue or create a new one if you cannot 
find any.


Best,
Bruno

On 22.09.20 05:09, Pushkar Deole wrote:

Bruno,

So, essentially, we are just waiting on the processing of first event that
got an error before going ahead on to the next one.

Second, if application handles storing the events in state store for retry,
Kafka stream would essentially commit the offset of those events, so next
event will be polled by consumer, correct?

Instead of this work around, is there any provision in kafka streams for
this scenario? e.g. in case application registers application level
exceptions then kafka streams will take care of it and do all this
internally, and will not commit the offset of that event and hence will
keep polling the same event again?
Since this is a common scenario, using a particular configuration for users
can achieve this in Kafka streams internally?


On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna  wrote:


Hi Pushkar,

If you want to keep the order, you could still use the state store I
suggested in my previous e-mail and implement a queue on top of it. For
that you need to put the events into the store with a key that
represents the arrival order of the events. Each time a record is
received from the input topic, the events are read in arrival order from
the state store and the data in the global table is probed. If an event
matches data from the global table the event is removed from the state
store and emitted. If an event does not match data from the global table
the processing is stopped and nothing is emitted.

Best,
Bruno

On 21.09.20 14:21, Pushkar Deole wrote:

Bruno,

1. the loading of topic mapped to GlobalKTable is done by some other
service/application so when my application starts up, it will just sync a
GlobalKTable against that topic and if that other service/application is
still starting up then it may not have loaded that data on that topic and
that's the reason it is not available to my application through the
GlobalKTable.

2. I don't want out of order processing to happen, that's the reason I

want

my streams application to keep trying same event until the other
application starts up and required data becomes available in globalKtable


On Mon, Sep 21, 2020 at 5:42 PM Bruno Cadonna 

wrote:



Thank you for clarifying! Now, I think I understand.

You could put events for which required data in the global table is not
available into a state store and each time an event from the input topic
is processed, you could lookup all events in your state store and see if
required data is now available for them.

However, be aware that this can mix up the original order of the events
in your input topic if required data of later events is available before
required data of earlier events. Furthermore, you need to consider the
case when you have a huge amount of events in the state store and
suddenly all required data in the global table is available, because
processing all those events at once might lead to exceeding
max.poll.interval.ms and the stream thread might be kicked out of the
consumer group. To solve that, you may want to limit the number of
events processed at once. You also need to avoid the state store growing
indefinitely if required data in the global table is not available for a
long time or not available at all. Maybe all this caveats do not apply
to your use case.

Best,
Bruno


On 21.09.20 13:45, Pushkar Deole wrote:

Say the application level exception is named as :
MeasureDefinitionNotAvaialbleException

What I am trying to achieve is: in above case when the event processing
fails due to required data not available, the streams should not

proceed

on

to next event, however it should wait for the processing of current

event

to

Re: Kafka streams - how to handle application level exception in event processing

2020-09-23 Thread Bruno Cadonna

Hi Pushkar,

if you do not want to lose any event, you should cache the events 
somewhere (e.g. a state store) in case there is an issue with an 
external system you connect to (e.g. database issue). If the order of 
the event is important, you must ensure that the events in your cache 
are processed in the order they where written to the cache (i.e. 
first-in first-out).


Maybe you can find some good hints in the links Gilles posted.

Best,
Bruno

On 22.09.20 10:51, Pushkar Deole wrote:

Thank you Gilles..will take a look..

Bruno, thanks for your elaborate explanation as well... however it
basically exposes my application to certain issues..

e.g. the application deals with agent states of a call center, and where
the order of processing is important. So when agent is logged in then he
keeps rotating between Ready, and Not ready states and at the end of the
day he becomes Logged out... If while processing the Ready event, there is
some temporary issue with database/network and the event processing gets
exception, application does few retries but no luck.
As per kafka polling, it will go ahead and poll next record from partition
for the same agent (since agent id being key) and it will process logged
out event. So, this mean i lost the Ready event in between due to the
database issue? Even if i store this event somewhere for processing it
later, processing the Ready event after logged out, doesn't make sense
since order of state is important? Is my u

On Tue, Sep 22, 2020 at 1:32 PM Gilles Philippart
 wrote:


Hi Pushkar,

Uber has written about how they deal with failures and reprocessing here,
it might help you achieve what you describe:
https://eng.uber.com/reliable-reprocessing/.

Unfortunately, there isn't much written documentation about those patterns.
There's also a good talk from Confluent's Antony Stubbs on how you can do
certain things with the Processor API that you can't do with the Kafka
Streams DSL:

https://www.confluent.io/kafka-summit-lon19/beyond-dsl-unlocking-power-kafka-streams-processor-api
.

Gilles Philippart
Funding Circle Engineering

On Tue, 22 Sep 2020 at 08:12, Bruno Cadonna  wrote:


Hi Pushkar,

I think there is a misunderstanding. If a consumer polls from a
partition, it will always poll the next event independently whether the
offset was committed or not. Committed offsets are used for fault
tolerance, i.e., when a consumer crashes, the consumer that takes over
the work of the crashed consumer will start polling record from the
offset the crashed consumer committed last. This is not only true for
Kafka Streams, but for all applications that use a Kafka consumer with
subscription.

To be clear, my proposal is not a workaround. This is one approach to
solve your problem in Kafka Streams. You could have a look into
stream-stream joins if you can use a stream instead of a global table.
Another approach would be to use a plain Kafka consumer instead of Kafka
Stream with which you would have a more fine-grained control about polls
and commits. In any case, be aware that blocking processing on an event
indefinitely may result in your lag and/or your state growing
indefinitely.

If you think there is something missing in Kafka Streams, you are very
welcome to search through the tickets in
https://issues.apache.org/jira/projects/KAFKA/issues and comment on
tickets that would solve your issue or create a new one if you cannot
find any.

Best,
Bruno

On 22.09.20 05:09, Pushkar Deole wrote:

Bruno,

So, essentially, we are just waiting on the processing of first event

that

got an error before going ahead on to the next one.

Second, if application handles storing the events in state store for

retry,

Kafka stream would essentially commit the offset of those events, so

next

event will be polled by consumer, correct?

Instead of this work around, is there any provision in kafka streams

for

this scenario? e.g. in case application registers application level
exceptions then kafka streams will take care of it and do all this
internally, and will not commit the offset of that event and hence will
keep polling the same event again?
Since this is a common scenario, using a particular configuration for

users

can achieve this in Kafka streams internally?


On Mon, Sep 21, 2020 at 9:01 PM Bruno Cadonna 

wrote:



Hi Pushkar,

If you want to keep the order, you could still use the state store I
suggested in my previous e-mail and implement a queue on top of it.

For

that you need to put the events into the store with a key that
represents the arrival order of the events. Each time a record is
received from the input topic, the events are read in arrival order

from

the state store and the data in the global table is probed. If an

event

matches data from the global table the event is removed from the state
store and emitted. If an event does not match data from the global

table

the processing is stopped and nothing is emitted.

Best,
Bruno

On 21.09.20 1

Re: kafka stream processor's process method

2021-01-11 Thread Bruno Cadonna

Hi Sathya,

MyProcessor does not have access to MySource, because in MySource you 
just build the topology that is then executed by Kafka Streams. So you 
cannot send anything to MySource, because MyProcessor does not know 
anything about MySource.


If you want to stop consumption upon an exception from your service, you 
throw that exception in process(). That would stop the stream thread on 
which the processor is executed. Other running stream threads on the 
same client and other Streams clients in your Streams application are 
not affected by this exception. If you want to shutdown the Streams 
client on which the stream thread that throws the exception runs you 
need pass a reference of your Streams client (i.e., a reference to the 
KafkaStreams object) to the uncaught exception handler that you can set 
with KafkaStreams#setUncaughtExceptionHandler() and in the uncaught 
exception handler you need to call KafkaStreams#close(Duration.ZERO). 
Make sure you call close() with Duration.ZERO since otherwise you might 
run into a deadlock.


We are currently developing a more sophisticated way to react on 
exceptions that would also allow you to shutdown your whole Streams 
application (i.e. close all KafkaStreams objects) upon an exception. See 
more details under https://cwiki.apache.org/confluence/x/lkN4CQ


Best,
Bruno



On 09.01.21 10:41, Sathya Murthy wrote:

Hi  there
i m sathya,
i have below requirements in my project , please let me know how to
achieve this requirement.


These are my two kafka stream classes

1. MySource

2. MyProcessor

and Mysource class sends continues stream of data and retrieved in process
method of Myprocessor class.

My requirements are

1) When my each message is processed inside process method, I need to send
response back to MySource class.(either SUCCESS/FAILED)

2) When it unsuccessful like any exception thrown while invoking service
call (newApplication.service(value);)

The process method should stop consume any messages further to prevent data
loss.

could you please help me on this.

1) MySource class

Kstreambuilder .build ().addSource (READ_FROM_TOPIC,
Serdes.String.deserialzer (), Serdes.String.deserialzer (), messages)

.addProcessor (TRAN_PROCESSOR,()->new MyProcessor(),READ_FROM_TOPIC)

2) MyProcessor class

Public class MyProcessor implements Processor{

Public void process (String key,String value){

Try{

newApplication.service(value);

} catch (exception e){

}

}



Re: RocksDB state store disk space estimation

2021-02-18 Thread Bruno Cadonna

Hi Chris,

your estimation looks correct to me.

I do not know how big M might be. Maybe the following link can help you 
with the estimation:


https://github.com/facebook/rocksdb/wiki/Rocksdb-BlockBasedTable-Format

There are also some additional files that RocksDB keeps in its 
directory. I guess the best way to estimate the space is experimentally.


Also take into account that, you will have one state store per partition.

If you want to save disk space, you should try to use Leveled compaction 
(https://github.com/facebook/rocksdb/wiki/Leveled-Compaction) instead 
since it has a space amplification of 10% instead of 100% with Universal 
compaction. That is, you can replace the 2 in your formula with 1.1.


Since AK 2.7, you can also monitor the sizes of your RocksDB state 
stores with the metric total-sst-files-size 
(https://kafka.apache.org/documentation/#kafka_streams_rocksdb_monitoring)


Best,
Bruno

On 18.02.21 17:43, Chris Toomey wrote:

We're using RocksDB as a persistent Kafka state store for compacted topics
and need to be able to estimate the maximum disk space required.

We're using the default config. settings provided by Kafka, which include
Universal compaction, no compression, and 4k block size.

Given these settings and a topic w/ key size K, value size V, and number of
records R, I'd assume a rough disk space estimation would be of the form

max. disk space = (K+V)*R*M*2

where M is an unknown DB size -> disk size multiplier and *2 is to allow
for full compaction as per here
.

Does this look right, and can anyone provide a ballpark range for the
multiplier M and/or some guidelines for how to estimate it?

much thanks,
Chris



Re: Streaming Data

2019-04-09 Thread Bruno Cadonna
Hi Nick,

You could give Kafka Streams a try for your use case. It is included in
Kafka, so probably you already have it!

For details, see https://kafka.apache.org/documentation/streams/ .

KSQL, which has already been mentioned by another member of the mailing
list uses Kafka Streams under the hood.

Best,
Bruno


On Tue, Apr 9, 2019 at 1:26 PM Nick Torenvliet 
wrote:

> Hi all,
>
> Just looking for some general guidance.
>
> We have a kafka -> druid pipeline we intend to use in an industrial setting
> to monitor process data.
>
> Our kafka system recieves messages on a single topic.
>
> The messages are {"timestamp": yy:mm:ddThh:mm:ss.mmm, "plant_equipment_id":
> "id_string", "sensorvalue": float}
>
> For our POC there are about 2000 unique plant_equipment ids, this will
> quickly grow to 20,000.
>
> The kafka topic streams into druid
>
> We are building some node.js/react browser based apps for analytics and
> real time stream monitoring.
>
> We are thinking that for visualizing historical data sets we will hit druid
> for data.
>
> For real time streaming we are wondering what our best option is.
>
> One option is to just hit druid semi regularly and update the on screen
> visualization as data arrives from there.
>
> Another option is to stream subset of the topics (somehow) from kafka using
> some streams interface.
>
> With all the stock ticker apps out there, I have to imagine this is a
> really common use case.
>
> Anyone have any thoughts as to what we are best to do?
>
> Nick
>


-- 
Bruno Cadonna
Software Engineer at Confluent


Re: Using processor API via DSL

2019-04-12 Thread Bruno Cadonna
Hi Alessandro,

Have you considered using `transform()` (actually in your case you should
use `transformValues()`) instead of `.process()`? `transform()` and
`transformValues()` are stateful operations similar to `.process` but they
return a `KStream`. On a `KStream` you can then apply a windowed
aggregation.

Hope that helps.

Best,
Bruno


On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Hi there,
>
> I'm just starting with Kafka and I'm trying to create a stream processor
> that in multiple stages:
>  - filters messages using a kv store so that only messages with higher
> timestamp gets processed
>  - aggregates the message metrics by minute giving e.g. the avg of those
> metrics in that minute
>
> The message is simple, the key is the sensor ID and the value is e.g. {
> timestamp: UNIX_TIMESTAMP, speed: INT }.
>
> I've started by creating a processor to use the kv store and filter old
> messages:
>
>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>
> Then I was trying to implement windowing, I saw very nice windowing
> examples for the DSL but none for the Processor API (only a small reference
> to the windowed store), can someone point me in the right direction?
>
> Now, since I wasn't able to find any example I tried to use the DSL but
> haven't found a way to use my processor with it, I saw this
>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> but
> it explains mostly transformers not processors. I also saw after that the
> example usage of the processor but `.process(...)` returns void, so I
> cannot have a KStream from a processor?
>
> Thank you all in advance
>
> --
> Alessandro Tagliapietra
>


Re: Using processor API via DSL

2019-04-14 Thread Bruno Cadonna
Hi Alessandro,

the `TransformSupplier` is internally wrapped with a `ProcessorSupplier`,
so the statement

`transform` is essentially equivalent to adding the Transformer via
Topology#addProcessor() to your processor topology

is correct.

If you do not change the key, you should definitely use one of the
overloads of `transformValues` to avoid internal data redistribution. In
your case the overload with `ValueTransformerWithKeySupplier` as suggested
by Matthias would fit.

Best,
Bruno

On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax 
wrote:

> There is also `ValueTransformerWithKey` that gives you read-only acess
> to the key.
>
> -Matthias
>
> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> > Hi Bruno,
> >
> > Thank you for the quick answer.
> >
> > I'm actually trying to do that since it seems there is really no way to
> > have it use `Processor`.
> > I just wanted (if that would've made any sense) to use the Processor in
> > both DSL and non-DSL pipelines.
> >
> > Anyway, regarding `transformValues()` I don't think I can use it as I
> need
> > the message key since that is the discriminating value for the filter (I
> > want to exclude old values per sensor ID so per message key)
> >
> > Right now I've this
> >
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> > and
> > i'm using it with `transform()` .
> >
> > One thing I've found confusing is this
> >
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >
> > transform is essentially equivalent to adding the Transformer via
> >> Topology#addProcessor() to yourprocessor topology
> >> <
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >
> >> .
> >
> >
> > is it? Doesn't `transform` need a TransformSupplier while `addProcessor`
> > uses a ProcessorSupplier?
> >
> > Thank you again for your help
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna 
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> Have you considered using `transform()` (actually in your case you
> should
> >> use `transformValues()`) instead of `.process()`? `transform()` and
> >> `transformValues()` are stateful operations similar to `.process` but
> they
> >> return a `KStream`. On a `KStream` you can then apply a windowed
> >> aggregation.
> >>
> >> Hope that helps.
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >>> Hi there,
> >>>
> >>> I'm just starting with Kafka and I'm trying to create a stream
> processor
> >>> that in multiple stages:
> >>>  - filters messages using a kv store so that only messages with higher
> >>> timestamp gets processed
> >>>  - aggregates the message metrics by minute giving e.g. the avg of
> those
> >>> metrics in that minute
> >>>
> >>> The message is simple, the key is the sensor ID and the value is e.g. {
> >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >>>
> >>> I've started by creating a processor to use the kv store and filter old
> >>> messages:
> >>>
> >>>
> >>>
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >>>
> >>> Then I was trying to implement windowing, I saw very nice windowing
> >>> examples for the DSL but none for the Processor API (only a small
> >> reference
> >>> to the windowed store), can someone point me in the right direction?
> >>>
> >>> Now, since I wasn't able to find any example I tried to use the DSL but
> >>> haven't found a way to use my processor with it, I saw this
> >>>
> >>>
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >>> but
> >>> it explains mostly transformers not processors. I also saw after that
> the
> >>> example usage of the processor but `.process(...)` returns void, so I
> >>> cannot have a KStream from a processor?
> >>>
> >>> Thank you all in advance
> >>>
> >>> --
> >>> Alessandro Tagliapietra
> >>>
> >>
> >
>
>


Re: Using processor API via DSL

2019-04-15 Thread Bruno Cadonna
Hi Alessandro,

Have a look at this Kafka Usage Pattern for computing averages without
using an ArrayList.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?

The advantages of this pattern over the ArrayList approach is the reduced
space needed to compute the aggregate. Note that you will still need to
implement a SerDe. However, the SerDe should be a bit easier to implement
than a SerDe for an ArrayList.

Hope that helps.

Best,
Bruno

On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Sorry but it seemed harder than I thought,
>
> to have the custom aggregation working I need to get an ArrayList of all
> the values in the window, so far my aggregate DSL method creates an
> ArrayList on the initializer and adds each value to the list in the
> aggregator.
> Then I think I'll have to provide a serder to change the output type of
> that method.
> I was looking at
>
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> but
> that seems more towards a list of longs and already uses longSerde.
> I'm currently trying to implement another avro model that has a field of
> type array so I can use the regular avro serializer to implement this.
> Should I create my own serdes instead or is this the right way?
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Thank you Bruno and Matthias,
> >
> > I've modified the transformer to implement the ValueTransformerWithKey
> > interface and everything is working fine.
> > I've now to window the data and manually aggregate each window data since
> > I've to do some averages and sum of differences.
> > So far I've just having some issues with message types since I'm changing
> > the data type when aggregating the window but I think it's an easy
> problem.
> >
> > Thank you again
> > Best
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> the `TransformSupplier` is internally wrapped with a
> `ProcessorSupplier`,
> >> so the statement
> >>
> >> `transform` is essentially equivalent to adding the Transformer via
> >> Topology#addProcessor() to your processor topology
> >>
> >> is correct.
> >>
> >> If you do not change the key, you should definitely use one of the
> >> overloads of `transformValues` to avoid internal data redistribution. In
> >> your case the overload with `ValueTransformerWithKeySupplier` as
> suggested
> >> by Matthias would fit.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax  >
> >> wrote:
> >>
> >> > There is also `ValueTransformerWithKey` that gives you read-only acess
> >> > to the key.
> >> >
> >> > -Matthias
> >> >
> >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> >> > > Hi Bruno,
> >> > >
> >> > > Thank you for the quick answer.
> >> > >
> >> > > I'm actually trying to do that since it seems there is really no way
> >> to
> >> > > have it use `Processor`.
> >> > > I just wanted (if that would've made any sense) to use the Processor
> >> in
> >> > > both DSL and non-DSL pipelines.
> >> > >
> >> > > Anyway, regarding `transformValues()` I don't think I can use it as
> I
> >> > need
> >> > > the message key since that is the discriminating value for the
> filter
> >> (I
> >> > > want to exclude old values per sensor ID so per message key)
> >> > >
> >> > > Right now I've this
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> >> > > and
> >> > > i'm using it with `transform()` .
> >> > >
> >> > > One thing I've found confusing is this
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >> > >
> >> > > transform is essen

Re: Using processor API via DSL

2019-04-16 Thread Bruno Cadonna
Hi Alessandro,

What version of Kafka do you use?

Could you please give a more detailed example for the issues with the two
keys you see?

Could the following bug be related to the duplicates you see?

https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22

How do you restart the processor?

Best,
Bruno

On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Thank you Bruno,
>
> I'll look into those, however average is just a simple thing I'm trying
> right now just to get an initial windowing flow working.
> In the future I'll probably still need the actual values for other
> calculations. We won't have more than 60 elements per window for sure.
>
> So far to not manually serialize/deserialize the array list I've created an
> Avro model with an array field containing the values.
> I had issues with suppress as explained here
>
>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
>
> but I got that working.
> So far everything seems to be working, except a couple things:
>  - if I generate data with 1 key, I correctly get a value each 10 seconds,
> if I later start generating data with another key (while key 1 is still
> generating) the windowing emits a value only after the timestamp of key 2
> reaches the last generated window
>  - while generating data, if I restart the processor as soon as it starts
> it sometimes generates 2 aggregates for the same window even if I'm using
> the suppress
>
> Anyway, I'll look into your link and try to find out the cause of these
> issues, probably starting from scratch with a simpler example
>
> Thank you for your help!
>
> --
> Alessandro Tagliapietra
>
> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna  wrote:
>
> > Hi Alessandro,
> >
> > Have a look at this Kafka Usage Pattern for computing averages without
> > using an ArrayList.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> > ?
> >
> > The advantages of this pattern over the ArrayList approach is the reduced
> > space needed to compute the aggregate. Note that you will still need to
> > implement a SerDe. However, the SerDe should be a bit easier to implement
> > than a SerDe for an ArrayList.
> >
> > Hope that helps.
> >
> > Best,
> > Bruno
> >
> > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> > tagliapietra.alessan...@gmail.com> wrote:
> >
> > > Sorry but it seemed harder than I thought,
> > >
> > > to have the custom aggregation working I need to get an ArrayList of
> all
> > > the values in the window, so far my aggregate DSL method creates an
> > > ArrayList on the initializer and adds each value to the list in the
> > > aggregator.
> > > Then I think I'll have to provide a serder to change the output type of
> > > that method.
> > > I was looking at
> > >
> > >
> >
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> > > but
> > > that seems more towards a list of longs and already uses longSerde.
> > > I'm currently trying to implement another avro model that has a field
> of
> > > type array so I can use the regular avro serializer to implement this.
> > > Should I create my own serdes instead or is this the right way?
> > >
> > > Thank you in advance
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> > > tagliapietra.alessan...@gmail.com> wrote:
> > >
> > > > Thank you Bruno and Matthias,
> > > >
> > > > I've modified the transformer to implement the
> ValueTransformerWithKey
> > > > interface and everything is working fine.
> > > > I've now to window the data and manually aggregate each window data
> > since
> > > > I've to do some averages and sum of differences.
> > > > So far I've just having some issues with message types since I'm
> > changing
> > > > the data type when aggregating the window but I think it's an easy
> > > problem.
> > > >
>

Re: Using processor API via DSL

2019-04-19 Thread Bruno Cadonna
ed metric{"timestamp": 16, "production": 1}
> S1 with filtered metric{"timestamp": 161000, "production": 1}
> S1 with computed metric {"timestamp": 16, "production": 10}
> S1 with filtered metric{"timestamp": 162000, "production": 1}
>
> as you can see, window for timestamp 16 is duplicated
>
> Is this because the window state isn't persisted across restarts?
> My ultimate goal is to have the window part emit only once and resume
> processing across restarts, while avoiding processing out of order data
> (that's the purpose of the TimestampIncrementalFilter)
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > I'm using the confluent docker images 5.2.1, so kafka 2.2.
> > Anyway I'll try to make a small reproduction repo with all the different
> > cases soon.
> >
> > Thank you
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna 
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> What version of Kafka do you use?
> >>
> >> Could you please give a more detailed example for the issues with the
> two
> >> keys you see?
> >>
> >> Could the following bug be related to the duplicates you see?
> >>
> >>
> >>
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> >>
> >> How do you restart the processor?
> >>
> >> Best,
> >> Bruno
> >>
> >> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >> > Thank you Bruno,
> >> >
> >> > I'll look into those, however average is just a simple thing I'm
> trying
> >> > right now just to get an initial windowing flow working.
> >> > In the future I'll probably still need the actual values for other
> >> > calculations. We won't have more than 60 elements per window for sure.
> >> >
> >> > So far to not manually serialize/deserialize the array list I've
> >> created an
> >> > Avro model with an array field containing the values.
> >> > I had issues with suppress as explained here
> >> >
> >> >
> >> >
> >>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> >> >
> >> > but I got that working.
> >> > So far everything seems to be working, except a couple things:
> >> >  - if I generate data with 1 key, I correctly get a value each 10
> >> seconds,
> >> > if I later start generating data with another key (while key 1 is
> still
> >> > generating) the windowing emits a value only after the timestamp of
> key
> >> 2
> >> > reaches the last generated window
> >> >  - while generating data, if I restart the processor as soon as it
> >> starts
> >> > it sometimes generates 2 aggregates for the same window even if I'm
> >> using
> >> > the suppress
> >> >
> >> > Anyway, I'll look into your link and try to find out the cause of
> these
> >> > issues, probably starting from scratch with a simpler example
> >> >
> >> > Thank you for your help!
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna 
> >> wrote:
> >> >
> >> > > Hi Alessandro,
> >> > >
> >> > > Have a look at this Kafka Usage Pattern for computing averages
> without
> >> > > using an ArrayList.
> >> > >
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> >> > > ?
> >> > >
> >> > > The advantages of this pattern over the ArrayList approach is the
> >> reduced
> >> > > space needed to compute the a

Re: Using processor API via DSL

2019-04-23 Thread Bruno Cadonna
Hi Alessandro,

It seems that the behaviour you described regarding the window aggregation
is due to bugs. The good news is that the bugs have been already fixed.

The relevant bug reports are
https://issues.apache.org/jira/browse/KAFKA-7895
https://issues.apache.org/jira/browse/KAFKA-8204

The fixes for both bugs have been already merged to the 2.2 branch.

Could you please build from the 2.2 branch and confirm that the fixes solve
your problem?

Best,
Bruno


On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Thanks Matthias, one less thing to worry about in the future :)
>
> --
> Alessandro Tagliapietra
>
>
> On Sat, Apr 20, 2019 at 11:23 AM Matthias J. Sax 
> wrote:
>
> > Just a side note. There is currently work in progress on
> > https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
> > configuration problem for Serdes.
> >
> > -Matthias
> >
> > On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> > > Hi Bruno,
> > > thanks a lot for checking the code, regarding the SpecificAvroSerde
> I've
> > > found that using
> > >
> > > final Serde valueSpecificAvroSerde = new
> > SpecificAvroSerde<>();
> > > final Map serdeConfig =
> > > Collections.singletonMap("schema.registry.url", "http://localhost:8081
> > ");
> > > valueSpecificAvroSerde.configure(serdeConfig, false);
> > >
> > > and then in aggregate()
> > >
> > > Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> > >
> > > fixed the issue.
> > >
> > > Thanks in advance for the windowing help, very appreciated.
> > > In the meantime I'll try to make some progress on the rest.
> > >
> > > Have a great weekend
> > >
> > > --
> > > Alessandro Tagliapietra
> > >
> > >
> > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna 
> > wrote:
> > >
> > >> Hi Alessandro,
> > >>
> > >> I had a look at your code. Regarding your question whether you use the
> > >> SpecificAvroSerde correctly, take a look at the following
> documentation:
> > >>
> > >>
> > https://docs.confluent.io/current/streams/developer-guide/datatypes.html
> > >>
> > >> I haven't had the time yet to take a closer look at your problems with
> > the
> > >> aggregation. I will have a look next week.
> > >>
> > >> Have a nice weekend,
> > >> Bruno
> > >>
> > >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
> > >> tagliapietra.alessan...@gmail.com> wrote:
> > >>
> > >>> So I've started with a new app with the archetype:generate as in
> > >>> https://kafka.apache.org/22/documentation/streams/tutorial
> > >>>
> > >>> I've pushed a sample repo here: https://github.com/alex88/kafka-test
> > >>> The avro schemas are a Metric with 2 fields: timestamp and production
> > >> and a
> > >>> MetricList with a list of records (Metric) to be able to manually do
> > the
> > >>> aggregation.
> > >>> Right now the aggregation is simple just for the purpose of the
> sample
> > >> repo
> > >>> and to easily see if we're getting wrong values.
> > >>>
> > >>> What I wanted to achieve is:
> > >>>  - have a custom generator that generates 1 message per second with
> > >>> production = 1 with 1 ore more separate message keys which in my case
> > are
> > >>> the sensor IDs generating the data
> > >>>  - a filter that removes out of order messages by having a state that
> > >>> stores key (sensorID) -> last timestamp
> > >>>  - a window operation that for this example just sums the values in
> > each
> > >> 10
> > >>> seconds windows
> > >>>
> > >>> To show where I'm having issues I've setup multiple branches for the
> > >> repo:
> > >>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>*
> is
> > >> the
> > >>> one I had initially "Failed to flush state store
> > >>> KSTREAM-AGGREGATE-STATE-STORE-03" that I tried to solve using
> > >>>
> > >>>
> > >>
> >
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-clas

Re: Kafka question on Stream Processing

2019-04-24 Thread Bruno Cadonna
Hi Gagan,

If you want to read a message, you need to poll the message from the
broker. The brokers have only very limited notion of message content. They
only know that a message has a key, a value, and some metadata, but they
are not able to interpret the contents of those message components. The
clients are responsible to read and process the messages. For reading and
processing, the clients need to poll the messages from the brokers.

For the processing you want to do, you could use Kafka Streams. See
https://kafka.apache.org/documentation/streams/ for more information. Have
a look at the branch DSL operation there.

Best regards,
Bruno

On Wed, Apr 24, 2019 at 1:54 AM Gagan Sabharwal  wrote:

> Hi team,
>
> Say we have a client which has pushed a message to a topic. The message has
> a a simple structure
>
> Task - Time of task
> Send an email - 1530
>
> Now say that this message is consumed by a consumer subscribed to this
> topic.
> Since Topic already has a storage, what I intend to do is just read the
> message (not poll it) and see if it is before 1530 then send it to the tail
> of the partition of that topic. Does Kafka provide such an Api? Next time
> when the consumer reads the message and see if the current time is after
> 1530, it will poll the message and execute the task.
>
> Regards
> Gagan
>


Re: duplicate packets in kafka topic

2019-04-26 Thread Bruno Cadonna
Hi,

What are duplicate messages in your use case?
1) different messages with the same content
2) the same message that is send multiple times to the broker due to
retries in the producer
3) something else

What do you mean with "identify those duplicates"? What do you want to do
with them?

For case 1), you could write all messages in a topic and then identify the
duplicates with a Kafka Streams application, process them and write the
results again to a topic. Be aware that identifying duplicate messages let
grow the state in the Kafka Stream application to the sum of the sizes of
all unique messages, because you have to store all messages in your state
to able to find duplicate future messages. That is not feasible in most
cases. To limit your state in the Streams application you can restrict the
identification of duplicates to a time window. For example, identify all
duplicate messages of the last hour. Within a window of one hour, you would
only process unique messages, but you would have duplicates across windows.
If you want a fail-safe identification of duplicates, you also need to
switch on exactly-once semantics in the Streams application.
See https://kafka.apache.org/documentation/streams/ and the Streams
configuration `processing.guarantee` under
https://kafka.apache.org/22/documentation/streams/developer-guide/config-streams.html#id6
for
more information on Kafka Streams and exactly-once semantics.

For case 2) and if you want to ensure that the same message is only written
once to the log you should look into idempotent producers.
See https://kafka.apache.org/documentation/#semantics and the producer
configuration `enable.idempotence` under
https://kafka.apache.org/documentation/#producerconfigs .

Hope that helps.

Best regards,
Bruno

On Fri, Apr 26, 2019 at 9:02 AM saching1...@gmail.com 
wrote:

> I have multiple clients who can send duplicate packets multiple time to
> same kafka topic.. Is there a way to identify those duplicate packets.
>


Re: Using processor API via DSL

2019-05-08 Thread Bruno Cadonna
Hi Alessandro,

Apologies for the late reply.

I tried the code from your repository under
https://github.com/alex88/kafka-test/tree/master and I run into a
`ClassCastException`. I think this is a bug that is described here
https://issues.apache.org/jira/browse/KAFKA-8317 .

Should I have tried one of the other branches?

Best regards,
Bruno

On Fri, May 3, 2019 at 9:33 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> Ok so I'm not sure if I did this correctly,
>
> I've upgraded both the server (by replacing the JARs in the confluent
> docker image with those built from kafka source) and the client (by using
> the built JARs as local file dependencies).
> I've used this as source: https://github.com/apache/kafka/archive/2.2.zip
> When the server runs it prints:
>
> INFO Kafka version: 2.2.1-SNAPSHOT
> (org.apache.kafka.common.utils.AppInfoParser).
>
> and regarding the client I don't see any kafka jars in the "External
> libraries" of the IntelliJ project tab so I think it's using the local JARs
> (2.2.1-SNAPSHOT).
>
> The problem is that the window isn't keeping the old values and still emits
> values with partially processed intervals.
>
> Just to summarize:
> https://gist.github.com/alex88/43b72e23bda9e15657b008855e1904db
>
>  - consumer emits one message per second with production = 1
>  - windowing stream should emit one message each 10 seconds with the sum of
> productions (so production = 10)
>
> If I restart the stream processor, it emits window functions with partial
> data (production < 10) as you can see from the logs.
> I've checked the JAR file and it seems to include changes from
> https://github.com/apache/kafka/pull/6623 (it has the newly
> added FixedOrderMap class)
>
> Even after removing the suppress() the error seems to persist (look at
> consumer_nosuppress), here it seems it loses track of the contents of the
> window:
>
> S1 with computed metric {"timestamp": 5, "production": 10}
> S1 with computed metric {"timestamp": 6, "production": 1}
> S1 with computed metric {"timestamp": 6, "production": 2}
> S1 with computed metric {"timestamp": 6, "production": 3}
> S1 with computed metric {"timestamp": 6, "production": 4}
> -- RESTART --
> S1 with computed metric {"timestamp": 6, "production": 1}
> S1 with computed metric {"timestamp": 6, "production": 2}
> S1 with computed metric {"timestamp": 6, "production": 3}
> S1 with computed metric {"timestamp": 6, "production": 4}
> S1 with computed metric {"timestamp": 6, "production": 5}
> S1 with computed metric {"timestamp": 6, "production": 6}
> S1 with computed metric {"timestamp": 7, "production": 1}
>
> after restart during the 60 seconds window the sum restarts.
>
> Is it something wrong with my implementation?
>
> --
> Alessandro Tagliapietra
>
> On Thu, May 2, 2019 at 7:58 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > thank you for your help, glad to hear that those are only bugs and not a
> > problem on my implementation,
> > I'm currently using confluent docker images, I've checked their master
> > branch which seems to use the SNAPSHOT version however those
> > images/packages aren't publicly available. Are there any snapshot builds
> > available?
> > In the meantime I'm trying to create a custom docker image from kafka
> > source.
> >
> > Thanks
> >
> > --
> > Alessandro Tagliapietra
> >
> > On Tue, Apr 23, 2019 at 8:52 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> It seems that the behaviour you described regarding the window
> aggregation
> >> is due to bugs. The good news is that the bugs have been already fixed.
> >>
> >> The relevant bug reports are
> >> https://issues.apache.org/jira/browse/KAFKA-7895
> >> https://issues.apache.org/jira/browse/KAFKA-8204
> >>
> >> The fixes for both bugs have been already merged to the 2.2 branch.
> >>
> >> Could you please build from the 2.2 branch and confirm that the fixes
> >> solve
> >> your problem?
> >>
> >> Best,
> >> Bruno
> >>
> >>
> >> On Sat, Apr 20, 2019 at 2:16 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrot

Re: Microservices?

2019-05-28 Thread Bruno Cadonna
Hi,

If you want to know how Kafka is designed and implemented, please see
the documentation under

https://kafka.apache.org/documentation/

Especially sections "Getting Started", "Design", and "Implementation".

Best,
Bruno

On Mon, May 27, 2019 at 6:03 AM V1  wrote:
>
> Hi team Kafka,
>  I'm looking forward to contribute to Apache Kafka.
> May I know if Kafka is built on microservices architecture?


Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not 
block you on startup forever. The warning says that the local states of 
task 7_17 are corrupted because the offset you want to fetch of the 
state changelog topic partition 
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger 
or smaller than the offsets that exist on the brokers for that 
partition. If Streams runs into such an exception it will recreate the 
state from scratch which might take a while depending on the size of the 
state.


The root cause of this warning is not clear from the information you 
gave. Did you maybe reset the application but not wipe out the local 
state stores?


Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
  org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are
corrupted. Will close the task as dirty and re-create and bootstrap from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are
corrupted and hence needs to be re-initialized
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo



Re: Redis as state store

2021-03-15 Thread Bruno Cadonna

Hi Alex,

You are right! There is no "exactly once magic" backed into the RocksDB 
store code. The point is local vs remote. When a Kafka Streams client 
closes dirty under EOS, the state (i.e., the content of the state store) 
needs to be wiped out and to be re-created from scratch from the 
changelog topic on the brokers. To wipe out the state the state 
directory is deleted.


For a remote state store, the wiping out of the state directory would 
not delete the contents of the remote state store before Kafka Streams 
re-creates the content from scratch.


Wiping out the state directory, happens outside of the API implemented 
by a state store.


Best,
Bruno

On 15.03.21 13:59, Alex Craig wrote:

" Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a failure
of streams application instance"

I've heard this before but would love to know more about how a custom state
store would be at any greater risk than RocksDB as far as exactly-once
guarantees are concerned.  They all implement the same interface, so as
long as you're correctly implementing get(), put(), delete(), flush(), etc,
you should be fine right?  In other words, I don't think there is any
special "exactly once magic" that is baked into the RocksDB store code.  I
could be wrong though so I'd love to hear people's thoughts, thanks,

Alex C

On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
wrote:


Thanks for the responses. In the worst case, I might have to keep both
rocksdb for local store and keep an external store like Redis.

-mohan


On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:

 Another issue with 3rd party state stores could be violation of
 exactly-once guarantee provided by kafka streams in the event of a
failure
 of streams application instance.
 Kafka provides exactly once guarantee for consumer -> process ->
produce
 through transactions and with the use of state store like redis, this
 guarantee is weaker

 On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 
wrote:

 > Hello Mohan,
 >
 > I think what you had in mind works with Redis, since it is a remote
state
 > store engine, it does not have the co-partitioning requirements as
local
 > state stores.
 >
 > One thing you'd need to tune KS though is that with remote stores,
the
 > processing latency may be larger, and since Kafka Streams process all
 > records of a single partition in order, synchronously, you may need
to tune
 > the poll interval configs etc to make sure KS would stay in the
consumer
 > group and not trigger unnecessary rebalances.
 >
 > Guozhang
 >
 > On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan <
mpart...@hpe.com>
 > wrote:
 >
 > > Hi,
 > >
 > > I have a use case where messages come in with some key gets
assigned some
 > > partition and the state gets created. Later, key changes (but still
 > > contains the old key in the message) and gets sent to a different
 > > partition. I want to be able to grab the old state using the old
key
 > before
 > > creating the new state on this instance. Redis as a  state store
makes it
 > > easy to implement this where I can simply do a lookup before
creating the
 > > state. I see an implementation here :
 > >
 >
https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
 > >
 > > Has anyone tried this ? Any caveats.
 > >
 > > Thanks
 > > Mohan
 > >
 > >
 >
 > --
 > -- Guozhang
 >







Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:

Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this same
large environment, I had to downgrade the application, and when doing that
I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 
wrote:


Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of the
state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
   org.apache.kafka.streams.processor.internals.StreamThread -

stream-thread

[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted. Will close the task as dirty and re-create and bootstrap from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted and hence needs to be re-initialized
at


org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)

[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at


org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)

~[app.jar:?]
at


org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)

~[app.jar:?]
at


org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)

~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo







Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

A couple of questions:

1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?

Best,
Bruno

On 15.03.21 15:03, Murilo Tavares wrote:

Hi Bruno
No, I haven't tested resetting the application before upgrading on my large
environment. But I was able to reproduce it in my dev environment, which is
way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna 
wrote:


Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it
work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:

Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this

same

large environment, I had to downgrade the application, and when doing

that

I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 
wrote:


Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states of
task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is larger
or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of the
state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app

on

2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded

from

KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
org.apache.kafka.streams.processor.internals.StreamThread -

stream-thread

[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted. Will close the task as dirty and re-create and bootstrap

from

scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted and hence needs to be re-initialized
at




org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)

[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at




org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)

~[app.jar:?]
at




org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)

~[app.jar:?]
at




org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)

~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo











Re: Redis as state store

2021-03-15 Thread Bruno Cadonna

Hi Alex,

I guess wiping out the state directory is easier code-wise, faster, 
and/or at the time of development the developers did not design for 
remote state stores. But I do actually not know the exact reason.


Off the top of my head, I do not know how to solve this for remote state 
stores. Using the uncaught exception handler is not good, because a 
computing node could fail without giving the JVM the opportunity to 
throw an exception.


In your tests, try to increase the commit interval to a high value and 
see if you get inconsistencies. You should get an inconsistency if the 
state store maintains counts for keys and after the last commit before 
the failure, the Streams app puts an event with a new key K with value 1 
into the state store. After failover, Streams would put the same event 
with key K again into the state store. If the state store deleted all of 
its data, Streams would put again value 1, but if the state store did 
not delete all data, Streams would put value 2 which is wrong because it 
would count the same event twice.


Best,
Bruno


On 15.03.21 15:20, Alex Craig wrote:

Bruno,
Thanks for the info!  that makes sense.  Of course now I have more
questions.  :)  Do you know why this is being done outside of the state
store API?  I assume there are reasons why a "deleteAll()" type of function
wouldn't work, thereby allowing a state store to purge itself?  And maybe
more importantly, is there a way to achieve a similar behavior with a 3rd
party store?  I'm not sure if hooking into the uncaught exception handler
might be a good way to purge/drop a state store in the event of a fatal
error?  I did setup a MongoDB state store recently as part of a POC and was
testing it with EOS enabled.  (forcing crashes to occur and checking that
the result of my aggregation was still accurate)  I was unable to cause
inconsistent data in the mongo store (which is good!), though of course I
may just have been getting lucky.  Thanks again for your help,

Alex

On Mon, Mar 15, 2021 at 8:59 AM Pushkar Deole  wrote:


Bruno,

i tried to explain this in 'kafka user's language through above mentioned
scenario, hope i put it properly -:) and correct me if i am wrong

On Mon, Mar 15, 2021 at 7:23 PM Pushkar Deole 
wrote:


This is what I understand could be the issue with external state store:

kafka stream application consumes source topic, does processing, stores
state to kafka state store (this is backed by topic) and before producing
event on destination topic, the application fails with some issue. In
this case, the transaction has failed, so kafka guarantees either all or
none, means offset written to source topic, state written to state store
topic, output produced on destination topic... all of these happen or

none

of these and in this failure scenario it is none of these.

Assume you have redis state store, and you updated the state into redis
and stream application failed. Now, you have source topic and destination
topic consistent i.e. offset is not committed to source topic and output
not produced on destination topic, but you redis state store is
inconsistent with that since it is external state store and kafka can't
guarantee rollback ot state written there

On Mon, Mar 15, 2021 at 6:30 PM Alex Craig 

wrote:



" Another issue with 3rd party state stores could be violation of
exactly-once guarantee provided by kafka streams in the event of a

failure

of streams application instance"

I've heard this before but would love to know more about how a custom
state
store would be at any greater risk than RocksDB as far as exactly-once
guarantees are concerned.  They all implement the same interface, so as
long as you're correctly implementing get(), put(), delete(), flush(),
etc,
you should be fine right?  In other words, I don't think there is any
special "exactly once magic" that is baked into the RocksDB store

code.  I

could be wrong though so I'd love to hear people's thoughts, thanks,

Alex C

On Sun, Mar 14, 2021 at 4:58 PM Parthasarathy, Mohan 
wrote:


Thanks for the responses. In the worst case, I might have to keep both
rocksdb for local store and keep an external store like Redis.

-mohan


On 3/13/21, 8:53 PM, "Pushkar Deole"  wrote:

 Another issue with 3rd party state stores could be violation of
 exactly-once guarantee provided by kafka streams in the event of a
failure
 of streams application instance.
 Kafka provides exactly once guarantee for consumer -> process ->
produce
 through transactions and with the use of state store like redis,

this

 guarantee is weaker

 On Sat, Mar 13, 2021 at 5:28 AM Guozhang Wang 


wrote:

 > Hello Mohan,
 >
 > I think what you had in mind works with Redis, since it is a

remote

state
 > store engine, it does not have the co-partitioning requirements

as

local
 > state stores.
 >
 > One thing you'd need to tune KS though is that with remote

stores,

the
 > processing latency may 

Re: Error upgrading KafkaStreams

2021-03-15 Thread Bruno Cadonna

Hi Murilo,

OK, now I see why you do not get an error in the second case in your 
small environment where you cleaned up before upgrading. You would 
restore from the earliest offset anyway and that is defined by the 
earliest offset at the broker and that always exists. Hence, no out of 
range exception is thrown.


I am wondering why you get a out of range exception after upgrading 
without clean up, though.


A solution would be to clean up before upgrading in your large 
environment. I do not know if this is a viable solution for you.


Best,
Bruno

On 15.03.21 16:01, Murilo Tavares wrote:

Hi Bruno
We have an environment variable that, when set, will call
KafkaStreams.cleanup() and sleep.
The changelog topic is an internal KafkaStreams topic, for which I'm not
changing any policies.
It should be some default policy for a KTable in my understanding.
Thanks
Murilo



On Mon, 15 Mar 2021 at 10:20, Bruno Cadonna 
wrote:


Hi Murilo,

A couple of questions:

1. What do you mean exactly with clean up?
2. Do you have acleanup policy specified on the changelog topics?

Best,
Bruno

On 15.03.21 15:03, Murilo Tavares wrote:

Hi Bruno
No, I haven't tested resetting the application before upgrading on my

large

environment. But I was able to reproduce it in my dev environment, which

is

way smaller.
This is what I did:
- Clean up and downgrade to 2.4.
- Let it catch up;
- upgrade to 2.7; Same errors, but it caught up after a while;

Then I tried these steps:
- Clean up and downgrade to 2.4.
- Let it catch up;
- Clean up and upgrade to 2.7. No error this time.

Thanks
Murilo

On Mon, 15 Mar 2021 at 09:53, Bruno Cadonna 
wrote:


Hi Murilo,

Did you retry to upgrade again after you reset the application? Did it
work?

Best,
Bruno

On 15.03.21 14:26, Murilo Tavares wrote:

Hi Bruno
Thanks for your response.
No, I did not reset the application prior to upgrading. That was simply
upgrading KafkaStreams from 2.4 to 2.7.

I was able to reproduce it on a smaller environment, and it does indeed
recover.
In a large environment, though, it keeps like that for hours. In this

same

large environment, I had to downgrade the application, and when doing

that

I did reset the application, which just took a few minutes.

Thanks
Murilo

On Mon, 15 Mar 2021 at 06:21, Bruno Cadonna 


wrote:


Hi Murilo,

No, you do not need any special procedure to upgrade from 2.4 to 2.7.

What you see in the logs is not an error but a warning. It should not
block you on startup forever. The warning says that the local states

of

task 7_17 are corrupted because the offset you want to fetch of the
state changelog topic partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17 is

larger

or smaller than the offsets that exist on the brokers for that
partition. If Streams runs into such an exception it will recreate the
state from scratch which might take a while depending on the size of

the

state.

The root cause of this warning is not clear from the information you
gave. Did you maybe reset the application but not wipe out the local
state stores?

Best,
Bruno

On 12.03.21 19:11, Murilo Tavares wrote:

Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams

app

on

2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my

instances

stuck on startup.
In my understanding, I don't need any special procedure to upgraded

from

KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
 org.apache.kafka.streams.processor.internals.StreamThread -

stream-thread

[...] Detected the states of tasks


{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted. Will close the task as dirty and re-create and bootstrap

from

scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs


{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]}

are

corrupted and hence needs to be re-initialized
at






org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)

~[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)

~[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)

~[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)

[app.jar:?]
at






org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)

[app.jar:?]
Caused by:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException:

Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at






org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRan

Re: [ANNOUNCE] New committer: Tom Bentley

2021-03-15 Thread Bruno Cadonna

Congrats, Tom!

Best,
Bruno

On 15.03.21 18:59, Mickael Maison wrote:

Hi all,

The PMC for Apache Kafka has invited Tom Bentley as a committer, and
we are excited to announce that he accepted!

Tom first contributed to Apache Kafka in June 2017 and has been
actively contributing since February 2020.
He has accumulated 52 commits and worked on a number of KIPs. Here are
some of the most significant ones:
KIP-183: Change PreferredReplicaLeaderElectionCommand to use AdminClient
KIP-195: AdminClient.createPartitions
KIP-585: Filter and Conditional SMTs
KIP-621: Deprecate and replace DescribeLogDirsResult.all() and .values()
KIP-707: The future of KafkaFuture (still in discussion)

In addition, he is very active on the mailing list and has helped
review many KIPs.

Congratulations Tom and thanks for all the contributions!



Re: [kafka-clients] [VOTE] 2.7.1 RC0

2021-03-19 Thread Bruno Cadonna

Hi Mickael,

Please have a look at the following bug report:

https://issues.apache.org/jira/browse/KAFKA-12508

I set its priority to blocker since the bug might break at-least-once 
and exactly-once processing guarantees.


Feel free to set it back to major, if you think that it is not a blocker.

Best,
Bruno


On 19.03.21 12:26, Mickael Maison wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.7.1.

Apache Kafka 2.7.1 is a bugfix release and 40 issues have been fixed
since 2.7.0.

Release notes for the 2.7.1 release:
https://home.apache.org/~mimaison/kafka-2.7.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Friday, March 26, 5pm PST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~mimaison/kafka-2.7.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~mimaison/kafka-2.7.1-rc0/javadoc/

* Tag to be voted upon (off 2.7 branch) is the 2.7.1 tag:
https://github.com/apache/kafka/releases/tag/2.7.1-rc0

* Documentation:
https://kafka.apache.org/27/documentation.html

* Protocol:
https://kafka.apache.org/27/protocol.html

* Successful Jenkins builds for the 2.7 branch:
Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka-2.7-jdk8/135/

/**

Thanks,
Mickael



Re: [kafka-clients] [VOTE] 2.6.2 RC0

2021-03-19 Thread Bruno Cadonna

Hi Sophie,

Please have a look at the following bug report:

https://issues.apache.org/jira/browse/KAFKA-12508

I set its priority to blocker since the bug might break at-least-once 
and exactly-once processing guarantees.


Feel free to set it back to major, if you think that it is not a blocker.

Best,
Bruno

On 12.03.21 19:47, 'Sophie Blee-Goldman' via kafka-clients wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.6.2.

Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the 
2.6.1 release. Please see the release notes for more information.


Release notes for the 2.6.2 release:
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html 



*** Please download, test and vote by Friday, March 19th, 9am PST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS 

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/ 



* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/ 



* Javadoc:
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/ 



* Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
https://github.com/apache/kafka/releases/tag/2.6.2-rc0 



* Documentation:
https://kafka.apache.org/26/documentation.html 



* Protocol:
https://kafka.apache.org/26/protocol.html 



* Successful Jenkins builds for the 2.6 branch:
Unit/integration tests: 
https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/ 

System tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/ 



/**

Thanks,
Sophie

--
You received this message because you are subscribed to the Google 
Groups "kafka-clients" group.
To unsubscribe from this group and stop receiving emails from it, send 
an email to kafka-clients+unsubscr...@googlegroups.com 
.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/kafka-clients/CAFLS_9jkHGsj42DT7Og3%3Dov9RbO%3DbEAQX55h0L6YKHJQR9qJpw%40mail.gmail.com 
.


Re: [kafka-clients] [VOTE] 2.6.2 RC0

2021-03-19 Thread Bruno Cadonna

Hi Sophie,

Correction to my last e-mail: The bug does not break eos, but it breaks 
at-least-once.


Bruno

On 19.03.21 14:54, Bruno Cadonna wrote:

Hi Sophie,

Please have a look at the following bug report:

https://issues.apache.org/jira/browse/KAFKA-12508

I set its priority to blocker since the bug might break at-least-once 
and exactly-once processing guarantees.


Feel free to set it back to major, if you think that it is not a blocker.

Best,
Bruno

On 12.03.21 19:47, 'Sophie Blee-Goldman' via kafka-clients wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.6.2.

Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the 
2.6.1 release. Please see the release notes for more information.


Release notes for the 2.6.2 release:
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html 
<https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html> 



*** Please download, test and vote by Friday, March 19th, 9am PST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS <https://kafka.apache.org/KEYS>

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/ 
<https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/>


* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/ 
<https://repository.apache.org/content/groups/staging/org/apache/kafka/>


* Javadoc:
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/ 
<https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/>


* Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
https://github.com/apache/kafka/releases/tag/2.6.2-rc0 
<https://github.com/apache/kafka/releases/tag/2.6.2-rc0>


* Documentation:
https://kafka.apache.org/26/documentation.html 
<https://kafka.apache.org/26/documentation.html>


* Protocol:
https://kafka.apache.org/26/protocol.html 
<https://kafka.apache.org/26/protocol.html>


* Successful Jenkins builds for the 2.6 branch:
Unit/integration tests: 
https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/ 
<https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/>
System tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/ <https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/> 



/**

Thanks,
Sophie

--
You received this message because you are subscribed to the Google 
Groups "kafka-clients" group.
To unsubscribe from this group and stop receiving emails from it, send 
an email to kafka-clients+unsubscr...@googlegroups.com 
<mailto:kafka-clients+unsubscr...@googlegroups.com>.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/kafka-clients/CAFLS_9jkHGsj42DT7Og3%3Dov9RbO%3DbEAQX55h0L6YKHJQR9qJpw%40mail.gmail.com 
<https://groups.google.com/d/msgid/kafka-clients/CAFLS_9jkHGsj42DT7Og3%3Dov9RbO%3DbEAQX55h0L6YKHJQR9qJpw%40mail.gmail.com?utm_medium=email&utm_source=footer>. 



Re: [kafka-clients] [VOTE] 2.7.1 RC0

2021-03-19 Thread Bruno Cadonna

Hi Mickael,

Correction to my last e-mail: The bug does not break eos, but it breaks 
at-least-once.


Bruno


On 19.03.21 14:54, Bruno Cadonna wrote:

Hi Mickael,

Please have a look at the following bug report:

https://issues.apache.org/jira/browse/KAFKA-12508

I set its priority to blocker since the bug might break at-least-once 
and exactly-once processing guarantees.


Feel free to set it back to major, if you think that it is not a blocker.

Best,
Bruno


On 19.03.21 12:26, Mickael Maison wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.7.1.

Apache Kafka 2.7.1 is a bugfix release and 40 issues have been fixed
since 2.7.0.

Release notes for the 2.7.1 release:
https://home.apache.org/~mimaison/kafka-2.7.1-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Friday, March 26, 5pm PST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~mimaison/kafka-2.7.1-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~mimaison/kafka-2.7.1-rc0/javadoc/

* Tag to be voted upon (off 2.7 branch) is the 2.7.1 tag:
https://github.com/apache/kafka/releases/tag/2.7.1-rc0

* Documentation:
https://kafka.apache.org/27/documentation.html

* Protocol:
https://kafka.apache.org/27/protocol.html

* Successful Jenkins builds for the 2.7 branch:
Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka-2.7-jdk8/135/

/**

Thanks,
Mickael



Re: [ANNOUNCE] New Kafka PMC Member: Bill Bejeck

2021-04-12 Thread Bruno Cadonna

Congrats Bill! Well deserved!

Best,
Bruno

On 12.04.21 11:19, Satish Duggana wrote:

Congratulations Bill!!

On Thu, 8 Apr 2021 at 13:24, Tom Bentley  wrote:


Congratulations Bill!

On Thu, Apr 8, 2021 at 2:36 AM Luke Chen  wrote:


Congratulations Bill!

Luke

On Thu, Apr 8, 2021 at 9:17 AM Matthias J. Sax  wrote:


Hi,

It's my pleasure to announce that Bill Bejeck in now a member of the
Kafka PMC.

Bill has been a Kafka committer since Feb 2019. He has remained
active in the community since becoming a committer.



Congratulations Bill!

  -Matthias, on behalf of Apache Kafka PMC









Re: [ANNOUNCE] New Committer: Bruno Cadonna

2021-04-12 Thread Bruno Cadonna

Thank you all for the kind words!

Best,
Bruno

On 08.04.21 00:34, Guozhang Wang wrote:

Hello all,

I'm happy to announce that Bruno Cadonna has accepted his invitation to
become an Apache Kafka committer.

Bruno has been contributing to Kafka since Jan. 2019 and has made 99
commits and more than 80 PR reviews so far:

https://github.com/apache/kafka/commits?author=cadonna

He worked on a few key KIPs on Kafka Streams:

* KIP-471: Expose RocksDB Metrics in Kafka Streams
* KIP-607: Add Metrics to Kafka Streams to Report Properties of RocksDB
* KIP-662: Throw Exception when Source Topics of a Streams App are Deleted

Besides all the code contributions and reviews, he's also done a handful
for the community: multiple Kafka meetup talks in Berlin and Kafka Summit
talks, an introductory class to Kafka at Humboldt-Universität zu Berlin
seminars, and have co-authored a paper on Kafka's stream processing
semantics in this year's SIGMOD conference (
https://en.wikipedia.org/wiki/SIGMOD). Bruno has also been quite active on
SO channels and AK mailings.

Please join me to congratulate Bruno for all the contributions!

-- Guozhang



Re: [ANNOUNCE] New Kafka PMC Member: Randall Hauch

2021-04-19 Thread Bruno Cadonna

Congrats Randall! Well deserved!

Bruno

On 17.04.21 01:43, Matthias J. Sax wrote:

Hi,

It's my pleasure to announce that Randall Hauch in now a member of the
Kafka PMC.

Randall has been a Kafka committer since Feb 2019. He has remained
active in the community since becoming a committer.



Congratulations Randall!

  -Matthias, on behalf of Apache Kafka PMC



Re: State Store Data Retention

2021-04-19 Thread Bruno Cadonna

Hi Upesh,

The answers to your questions are:

1.
The configs cleanup.policy and retention.ms are topic configs. Hence, 
they only affect the changelog of a state store, not the local state 
store in a Kafka Streams client.


Locally, window and session stores remove data they do not need anymore. 
Window and session stores are segmented stores. That means they consist 
of segments that are ordered by the windows they contain. Once the 
segment that contains the oldest windows is not needed anymore, i.e., 
the data exceeded the retention time of the state store, the segment is 
removed.


Non-windowed state store will not remove data.

Worth noting here: If you change retention.ms directly on the brokers, 
it will not affect the behavior of local state stores.


2.
Yes, this behavior is the same for in-memory state stores and persistent 
state stores.


3.
Window and session state stores do remove data.


Best,
Bruno



On 18.04.21 18:18, Upesh Desai wrote:
Hello, I have not been able to find a concrete answer on if/how state 
stores on a running kafka streams instance remove data when it has 
passed the configured retention.ms config. So a couple clarification 
questions:


 1. If the stores are configured with: cleanup.policy=compact,delete AND
retention.ms=N, will the stores remove data automatically over time
in the running stream instance stores?
 2. Is this behavior the same for in-memory stores and persistent
rocksdb stores?
 3. If they do not remove data that has passed the retention.ms period,
is there a different way to periodically remove old data from the
stores?

I’m using kafka 2.7.0 components across the board (broker, connect, etc.).

Thanks in advance,
Upesh




Upesh Desai​
Senior Software Developer

*ude...@itrsgroup.com* 
*www.itrsgroup.com* 

Internet communications are not secure and therefore the ITRS Group does 
not accept legal responsibility for the contents of this message. Any 
view or opinions presented are solely those of the author and do not 
necessarily represent those of the ITRS Group unless otherwise 
specifically stated.


[itrs.email.signature]



*Disclaimer*

The information contained in this communication from the sender is 
confidential. It is intended solely for use by the recipient and others 
authorized to receive it. If you are not the recipient, you are hereby 
notified that any disclosure, copying, distribution or taking action in 
relation of the contents of this information is strictly prohibited and 
may be unlawful.


This email has been scanned for viruses and malware, and may have been 
automatically archived by *Mimecast Ltd*, an innovator in Software as a 
Service (SaaS) for business. Providing a *safer* and *more useful* place 
for your human generated data. Specializing in; Security, archiving and 
compliance.




Re: Kafka Streams and retention limits

2021-04-26 Thread Bruno Cadonna
Additionally, with KIP-698 
(https://cwiki.apache.org/confluence/x/7CnZCQ), we will add 
verifications of changelogs to find  misconfigurations and report them 
to the users.


Best,
Bruno

On 24.04.21 22:58, Matthias J. Sax wrote:

The topics used by Kafka Streams to backup state stores, are called
changelog topic, and they are configures with log compaction instead of
retention, and thus, there is no size limit.

Cf https://kafka.apache.org/documentation/#compaction


-Matthias

On 4/24/21 1:32 PM, Dan Bradley wrote:

I'm trying to understand the interactions between stateful processing in
Kafka Streams and topic retention size restrictions. I'm fairly naive
about both Kafka and Kafka Streams so I'm sure I have something wrong
here, but my understanding is that, under stateful processing, local
state in the application is made fault tolerant by the topic's commit
log on the broker. That data, I assume, will count towards the topic's
retention.bytes limit. If so, then what happens when I hit that limit?
Let's say that the topic has a retention limit of 10mb and my stateful
processor needs more than that to do its work. Does the processor fail?
And what impact is there on retaining messages? Does the amount of
messages stored in the topic trend toward 0 as the allocated space is
used up by the processor?

Pointers to documentation that would explain this would also be helpful.
I've tried to answer this question for myself but with no luck so far.
Thank you very much.


Re: Changing Replication Factor

2021-04-29 Thread Bruno Cadonna

Hi Marcus,

1. If you change REPLICATION_FACTOR_CONFIG without resetting the 
application (or deleting the changelog and repartition topics) and 
redeploy the Streams application, the replication factor of the internal 
topics will not change. The replication factor will only change for new 
deployments, i.e., Streams applications with a new application ID or 
Streams applications that where reset. In both cases the internal topics 
will be newly created.


2. Changing the replication factor of a topic directly on the brokers 
should be fine. Kafka Streams should not re-create the internal topics 
and not throw any exceptions.


3. Unfortunately, I do not know the answer to this question. Hopefully 
somebody else can answer it.


I answered your questions to the best of my knowledge. The only way to 
confirm my answers is to test (preferably in a test environment).



Best,
Bruno

On 28.04.21 17:34, Marcus Horsley-Rai wrote:

Hi All,

I'm in a sub-optimal situation whereby I have some Kafka Streams apps
deployed to production, but the default replication factor set on the
brokers was 1 when they were first deployed.
As such, any state store changelog topics, and re-partition topics
therefore have RF 1 also.

I'm familiar with the bin/kafka-reassign-partitions.sh tool and how to use
that.
I've since also found the streams replication.factor
(StreamsConfig.REPLICATION_FACTOR_CONFIG) setting that can be supplied in
the streams apps config.

My questions are:

1.  Will simply changing the value of REPLICATION_FACTOR_CONFIG and
re-deploying have any effect on already-created internal topics?
2. Conversely, should I just change the RF of the internal topics using
the re-assign-partitions tool? Is that safe to do whilst the apps are still
running?
3. (not Streams-specific) If a broker that was the leader of
partition(s) for a topic with RF 1 died (i.e. no replicas), and was
non-recoverable - would the only way to restore service to those partitions
be to delete the topic and re-create?  (with data loss, understandably). I
couldn't seem to achieve this using preferred/unclean leader election, nor
using the re-assign-partitions tool, since that requires all brokers to be
healthy.

Many thanks in advance for any answers,

Marcus



Re: State Store Data Retention

2021-05-10 Thread Bruno Cadonna

Hi Navneeth,

I wrote that the *local state stores* are not affected when the topic 
configs cleanup.policy and retention.ms are passed to the state store. 
The *changelog topics* will consider the configs and they will remove 
data as specified in the configs.


In the case of a state migration to another instance, it depends whether 
the other instance already has some state for the given state locally.


- If the most recent offset on the instance is still within the range of 
offsets of the changelog topic on the brokers, the state will be 
replayed from the local offset to the most recent offset on the brokers. 
Data removed on the brokers might still exist locally.


- If the most recent offset on the instance is before the range of 
offsets of the changelog topic on the brokers, the state will be 
replayed from the beginning of the the changelog on the brokers which 
means that removed data in the changelog topic cannot be replayed 
because the beginning of the changelog was moved after this data.


- If the state does not exist on the instance, the state will be 
replayed from the beginning of the the changelog and removed data is not 
replayed as in the previous case.


I hope that helps.

Best,
Bruno

On 08.05.21 00:59, Navneeth Krishnan wrote:

Hi Bruno/All,

I have a follow up question regarding the same topic. As per you had
mentioned there will be no impact to key value stores even when retention.ms
and clean up policy is provided. Does that mean the change log topic will
not clear the data in the broker even after the retention period is over?

I agree the local state stores will not be able to delete the data but when
there is any reallocation then the state replay would just have to replay
the data for the given retention time. Is this understanding correct?

Thanks

On Mon, Apr 19, 2021 at 1:57 AM Bruno Cadonna  wrote:


Hi Upesh,

The answers to your questions are:

1.
The configs cleanup.policy and retention.ms are topic configs. Hence,
they only affect the changelog of a state store, not the local state
store in a Kafka Streams client.

Locally, window and session stores remove data they do not need anymore.
Window and session stores are segmented stores. That means they consist
of segments that are ordered by the windows they contain. Once the
segment that contains the oldest windows is not needed anymore, i.e.,
the data exceeded the retention time of the state store, the segment is
removed.

Non-windowed state store will not remove data.

Worth noting here: If you change retention.ms directly on the brokers,
it will not affect the behavior of local state stores.

2.
Yes, this behavior is the same for in-memory state stores and persistent
state stores.

3.
Window and session state stores do remove data.


Best,
Bruno



On 18.04.21 18:18, Upesh Desai wrote:

Hello, I have not been able to find a concrete answer on if/how state
stores on a running kafka streams instance remove data when it has
passed the configured retention.ms config. So a couple clarification
questions:

  1. If the stores are configured with: cleanup.policy=compact,delete AND
 retention.ms=N, will the stores remove data automatically over time
 in the running stream instance stores?
  2. Is this behavior the same for in-memory stores and persistent
 rocksdb stores?
  3. If they do not remove data that has passed the retention.ms period,
 is there a different way to periodically remove old data from the
 stores?

I’m using kafka 2.7.0 components across the board (broker, connect,

etc.).


Thanks in advance,
Upesh

<https://www.itrsgroup.com/>


Upesh Desai​
Senior Software Developer

*ude...@itrsgroup.com* <mailto:ude...@itrsgroup.com>
*www.itrsgroup.com* <https://www.itrsgroup.com/>

Internet communications are not secure and therefore the ITRS Group does
not accept legal responsibility for the contents of this message. Any
view or opinions presented are solely those of the author and do not
necessarily represent those of the ITRS Group unless otherwise
specifically stated.

[itrs.email.signature]



*Disclaimer*

The information contained in this communication from the sender is
confidential. It is intended solely for use by the recipient and others
authorized to receive it. If you are not the recipient, you are hereby
notified that any disclosure, copying, distribution or taking action in
relation of the contents of this information is strictly prohibited and
may be unlawful.

This email has been scanned for viruses and malware, and may have been
automatically archived by *Mimecast Ltd*, an innovator in Software as a
Service (SaaS) for business. Providing a *safer* and *more useful* place
for your human generated data. Specializing in; Security, archiving and
compliance.







Re: Streams Partition Allocation Skewness

2021-06-04 Thread Bruno Cadonna

Hi Navneeth,

I need some clarifications to be able to help you.

First of all it would be useful to know if your topology is stateful, 
i.e., if it has to maintain state. Since you have two topics with 72 
partitions but only 72 tasks (or partitions groups to assign) that need 
to be distributed over the nodes, I am assuming you have a join in your 
topology which would make the topology (and the tasks) stateful. Is this 
correct?


You say that one node has 3 data partitions assigned per stream thread 
which would mean that a node gets assigned 18 tasks if 6 stream threads 
are configured per node. I cannot see a node with 18 tasks assigned in 
the assignment in your previous e-mail. Can you clarify?


Streams' task assignment algorithm tries to balance the work load and 
tries to minimize the restoration of stateful tasks. Since it cannot 
achieve both at the same time, Streams performs a couple of probing 
rebalances every ten minutes (configurable with 
probing.rebalance.interval.ms) and moves around tasks until the workload 
is balanced. That means, it could be that the assignment is unbalanced 
in the beginning and improves over time. To know if Streams still 
performs probing rebalances, you can look into the log files and look 
for INFO messages with the following content


"Decided on assignment: ... with followup probing rebalance."

and

"Decided on assignment: ... with no followup probing rebalance."

Did you see the assignment you posted in your previous e-mail after the 
latter log message? If not, it could be that after the next probing 
rebalances the distribution of the tasks improved.


Said that, keep in mind that Streams does not guarantee an optimal 
distribution. The assignment is best-effort. Streams will try to do its 
best to distribute the tasks over the nodes, but there are no guarantees.


Finally, the rulw of thumb is to use as many stream threads on each node 
as the number of cores on the node to utilize the CPU efficiently.


Best,
Bruno

On 03.06.21 07:47, Navneeth Krishnan wrote:

This is how the assignment looks like after full restart. N represents node
and the second column is the number of partitions assigned. There are just
two input topics with equal partitions in the topology. I was expecting
each node to have 6 partitions assigned.

N1 7
N2 5
N3 7
N4 5
N5 7
N6 5
N7 7
N8 7
N9 7
N10 6
N11 5
N12 4


There are 72 partitions are here is the allocation.

0 N1
1 N2
2 N3
3 N4
4 N5
5 N6
6 N7
7 N8
8 N9
9 N10
10 N11
11 N12
12 N1
13 N5
14 N6
15 N7
16 N5
17 N9
18 N7
19 N8
20 N9
21 N10
22 N3
23 N12
24 N1
25 N7
26 N3
27 N4
28 N5
29 N6
30 N1
31 N2
32 N9
33 N10
34 N11
35 N7
36 N1
37 N9
38 N3
39 N4
40 N5
41 N2
42 N11
43 N8
44 N6
45 N10
46 N8
47 N9
48 N1
49 N2
50 N3
51 N4
52 N3
53 N5
54 N6
55 N8
56 N8
57 N10
58 N11
59 N12
60 N1
61 N2
62 N3
63 N4
64 N5
65 N7
66 N7
67 N8
68 N9
69 N10
70 N11
71 N12

On Wed, Jun 2, 2021 at 9:40 PM Navneeth Krishnan 
wrote:


We are using kafka version 2.6.1 on broker and 2.6.2 for streams.

Thanks

On Wed, Jun 2, 2021 at 7:18 PM Navneeth Krishnan 
wrote:


Hi All,

We recently migrated from flink to kafka streams in production and we are
facing a major issue. Any quick help would really be appreciated.

There are 72 input data topic partitions and 72 control stream topic
partitions. There is a minimum of 12 nodes with 6 streams threads on each
instance and we are using auto scaling based on CPU load. Also we do have
scenarios where the instances go down and it's replaced by a new instance.

Now the problem that we see is unequal partition allocation among
instances. For example one node has 3 data partitions allocated per stream
thread and the CPU on that node is about 80% whereas there is another node
in which only 4 stream threads have allocations and they are assigned with
one partition each.

Is there a way to equally distribute the partitions so that there will
not be a problem in processing the incoming data without much lag. In this
case some partitions have very high lag versus some in a few thousands.
This is impacting our production system.

Streams Configuration:
 acceptable.recovery.lag = 1
 application.id = prod-v1
 application.server = *.*.*.*:80
 bootstrap.servers = [*]
 buffered.records.per.partition = 1000
 built.in.metrics.version = latest
 cache.max.bytes.buffering = 104857600
 client.id =
 commit.interval.ms = 1
 connections.max.idle.ms = 54
 default.deserialization.exception.handler = class
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
 default.key.serde = class
org.apache.kafka.common.serialization.Serdes$ByteArraySerde
 default.production.exception.handler = class
org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
 default.timestamp.extractor = class
org.apache.kafka.streams.processor.WallclockTimestampExtractor
 default.value.serde = class
org.apache.kafka.common.seri

Re: Please add user heyingquan to the list of contributors

2021-06-07 Thread Bruno Cadonna

Hi,

I added you to the list of contributors in the Apache Kafka JIRA 
project. You can now assign tickets to yourself.


Welcome to Apache Kafka!

Best,
Bruno

On 05.06.21 15:44, 和映泉 wrote:

Please add user heyingquan to the list of contributors.



Re: Issue with StreamsBuilder.stream(Pattern)

2021-06-08 Thread Bruno Cadonna

Hi Will,

This looks like a bug to me.

Could you please open a Jira with the stacktrace of the exception and a 
minimal repro example?


Best,
Bruno

On 08.06.21 16:51, Will Bartlett wrote:

Hi all,

I'm hitting a NPE in a very basic repro. It happens when toString() is
called on the StreamSourceNode. It is just in logging code (debug), so no
real issue.

Essentially:

val builder = StreamsBuilder()
builder.stream(Pattern.compile("foo"), Consumed.with(Serdes.ByteArray(),
Serdes.ByteArray()))
val streams = KafkaStreams(builder.build(), Properties())
streams.start()

I had a look into it, and it seems like a bug.

StreamSourceNode.toString() calls topicNames():

https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java#L64-L70

Which will be null if constructed with a Pattern rather than a String:

https://github.com/apache/kafka/blob/1dadb6db0c6848a8a1d2eee1497f9b79b6e04e0e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/SourceGraphNode.java#L49

Which causes UnmodifiableCollection to throw.

I couldn't find any mention of the issue, and git blame suggests it is not
a recent change - so I am not sure. I am also very new to the jvm so my
confidence is low.

Happy to open a JIRA ticket if someone confirms :)

Thanks,

Will



Re: command to get the value of a config from the broker

2021-07-12 Thread Bruno Cadonna

Hi Dhirendra,

You could use the kafka-configs.sh script or in Java the admin client 
(see 
https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/admin/Admin.html)


Best,
Bruno

On 01.07.21 13:45, Dhirendra Singh wrote:

Hi All,
I want to get the value of a config from broker. I do not have access to
the server.properties file.
is there any command available for it ?

Thanks,
Dhirendra.



Re: disappearing messages with kafka streams

2021-07-13 Thread Bruno Cadonna

Hi Günter,

What is the timestamp of the records?

The difference between the system time on the brokers and the record 
timestamp is used to decide whether a record segment should be removed 
because its retention time is exceeded. So if the retention time of the 
topic is set to 1.5 days, the record timestamp is older than 1.5 days 
and the record is produced now to the topic, the record might be removed 
quite quickly from the topic.


Since the removal is done by a dedicated cleaner thread on the brokers 
and only non-active segments are removed, the removal is not 
deterministic. That means, that the record might not be removed 
immediately from the the topic after it was produced. Before the record 
is removed the current segment that contains the record needs to be 
rolled and the roll needs to happen before the next scheduled execution 
of the cleaner thread.


Best,
Bruno


On 13.07.21 12:03, guenterh.lists wrote:

Hi,

we come across a mysterious behavior with Kafka streams clients.
After writing about 4 messages to a topic, they are suddenly no 
longer available and the topic is empty. The client is still alive.


We use the following defaults as properties for the topic


     - retention.ms=12960 # 1.5 days

Other properties are kafka defaults

Changing the retention time property to
- retention.ms=-1
(infinite) the behavior doesn't occur.

Actually we are running the cluster in version 2.7.0 but this happened 
also in 2.4.x


The behavior isn't always consistent. In 2.4.x it was possible that the 
behavior occurred with some topics and not with others.


Thanks for any advice that could help us to solve the problem.

Günter





Re: Kafka Streams Handling uncaught exceptions REPLACE_THREAD

2021-08-10 Thread Bruno Cadonna

Hi Yoda,

What do you mean exactly with "skipping that failed message"?

Do you mean a record consumed from a topic that caused an exception that 
killed the stream thread?


If the record killed the stream thread due to an exception, for example, 
a deserialization exception, it will probably also kill the next stream 
thread that will read that record. Replacing a stream thread does not 
skip records but it can result in duplicate records depending on the 
application’s processing mode determined by the 
PROCESSING_GUARANTEE_CONFIG value as stated in the docs you cited.


Best,
Bruno



On 10.08.21 11:15, Luke Chen wrote:

Hi Yoda,
For your question:

If an application gets an uncaught exception, then the failed thread will

be replaced with another thread and it will continue processing messages,
skipping that failed message?

--> Yes, if everything goes well after `replace thread`, you can ignore
this failed message. Just one reminder that you should check the failed
message to avoid this `uncaught exception` thrown again, because if this
happens frequently, it'll impact application performance.

Thank you.
Luke

On Tue, Aug 10, 2021 at 4:25 PM Yoda Jedi Master  wrote:


"REPLACE_THREAD - Replaces the thread receiving the exception and
processing continues with the same number of configured threads. (Note:
this can result in duplicate records depending on the application’s
processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)"

If an application gets an uncaught exception, then the failed thread will
be replaced with another thread and it will continue processing messages,
skipping that failed message?





Re: Kafka Streams Handling uncaught exceptions REPLACE_THREAD

2021-08-16 Thread Bruno Cadonna

Hi Yoda,

for certain cases, Kafka Streams allows you to specify handlers that 
skip the problematic record. Those handlers are:


1. deserialization exception handler configured in 
default.deserialization.exception.handler
2. time extractor set in default.timestamp.extractor and in the Consumed 
object
3. production exception handler configured in 
default.production.exception.handler


Kafka Streams provides implementations for handlers 1 and 2 to skip the 
problematic records, that are LogAndContinueExceptionHandler and 
LogAndSkipOnInvalidTimestamp, respectively.


For some more details have a look at
https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling

If problematic records cause an exception in user code, the user code 
needs to provide functionality to skip the problematic record.


Best,
Bruno

On 10.08.21 13:26, Yoda Jedi Master wrote:

Hi Bruno, thank you for your answer.
I mean that the message that caused the exception was consumed and replaced
thread will continue from the next message. How then does it handle
uncaught exceptions, if it will fail again?


On Tue, Aug 10, 2021 at 12:33 PM Bruno Cadonna  wrote:


Hi Yoda,

What do you mean exactly with "skipping that failed message"?

Do you mean a record consumed from a topic that caused an exception that
killed the stream thread?

If the record killed the stream thread due to an exception, for example,
a deserialization exception, it will probably also kill the next stream
thread that will read that record. Replacing a stream thread does not
skip records but it can result in duplicate records depending on the
application’s processing mode determined by the
PROCESSING_GUARANTEE_CONFIG value as stated in the docs you cited.

Best,
Bruno



On 10.08.21 11:15, Luke Chen wrote:

Hi Yoda,
For your question:

If an application gets an uncaught exception, then the failed thread

will

be replaced with another thread and it will continue processing messages,
skipping that failed message?

--> Yes, if everything goes well after `replace thread`, you can ignore
this failed message. Just one reminder that you should check the failed
message to avoid this `uncaught exception` thrown again, because if this
happens frequently, it'll impact application performance.

Thank you.
Luke

On Tue, Aug 10, 2021 at 4:25 PM Yoda Jedi Master 

wrote:



"REPLACE_THREAD - Replaces the thread receiving the exception and
processing continues with the same number of configured threads. (Note:
this can result in duplicate records depending on the application’s
processing mode determined by the PROCESSING_GUARANTEE_CONFIG value)"

If an application gets an uncaught exception, then the failed thread

will

be replaced with another thread and it will continue processing

messages,

skipping that failed message?









Re: `java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS` after upgrading `Kafka-clients` from 2.5.0 to 3.0.0

2021-09-24 Thread Bruno Cadonna

Hi Bruce,

I do not know the specific root cause of your errors but what I found is 
that Spring 2.7.x is compatible with clients 2.7.0 and 2.8.0, not with 
3.0.0 and 2.8.1:


https://spring.io/projects/spring-kafka

Best.
Bruno

On 24.09.21 00:25, Chang Liu wrote:

Hi Kafka users,

I start running into the following error after upgrading `Kafka-clients` from 2.5.0 
to 3.0.0. And I see the same error with 2.8.1. I don’t see a working solution by 
searching on Google: 
https://stackoverflow.com/questions/46914225/kafka-cannot-create-embedded-kafka-server
 


This looks like backward incompatibility of Kafka-clients. Do you happen to 
know a solution for this?

```
java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS

at kafka.server.Defaults$.(KafkaConfig.scala:242)
at kafka.server.Defaults$.(KafkaConfig.scala)
at kafka.server.KafkaConfig$.(KafkaConfig.scala:961)
at kafka.server.KafkaConfig$.(KafkaConfig.scala)
at kafka.server.KafkaConfig.LogDirProp(KafkaConfig.scala)
at 
org.springframework.kafka.test.EmbeddedKafkaBroker.afterPropertiesSet(EmbeddedKafkaBroker.java:298)
at 
org.springframework.kafka.test.rule.EmbeddedKafkaRule.before(EmbeddedKafkaRule.java:113)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:221)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
```

I got some suggestion that is to upgrade Spring library.

This is the `pom.xml` that defines all my dependencies. I only upgraded the 
`Kafka-clients` in production: 
https://github.com/opensearch-project/security/blob/main/pom.xml#L84 


The dependency for test still remains: 
https://github.com/opensearch-project/security/blob/main/pom.xml#L503 


Is this the Spring library that should be upgraded? 
https://github.com/opensearch-project/security/blob/main/pom.xml#L495 


But even though I upgraded Spring library to 2.7.7: 
https://github.com/opensearch-project/security/blob/main/pom.xml#L496 
 , I got 
another error:

`java.lang.NoClassDefFoundError: org/apache/kafka/common/record/BufferSupplier`

Any suggestion helping me out this will be highly appreciated!

Thanks,
Bruce



Re: Improving I/O on KafkaStreams

2021-12-20 Thread Bruno Cadonna

Hi Murilo,

Have you checked out the following blog post on tuning performance of 
RocksDB state stores [1] especially the section on high disk I/O and 
write stalls [2]?


Do you manage the off-heap memory used by RocksDB as described in the 
Streams docs [3]?


I do not know what may have caused the increased I/O. We upgraded 
RocksDB from 5.18.3 to 5.18.4. And we disabled bulk loading mode for 
RocksDB. But I cannot say if one of these might be the reason.


Have you looked at the RocksDB metrics [4] to get better insight into 
the situation?


Best,
Bruno

[1] 
https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance
[2] 
https://www.confluent.io/blog/how-to-tune-rocksdb-kafka-streams-state-stores-performance/#write-stalls
[3] 
https://kafka.apache.org/30/documentation/streams/developer-guide/memory-mgmt.html

[4] https://kafka.apache.org/documentation/#kafka_streams_rocksdb_monitoring

On 20.12.21 18:23, Murilo Tavares wrote:

Hi
I have a KafkaStreams application with a reasonably complex, stateful
topology.
By monitoring it, we can say for sure that it is bounded by writing I/O.
This has become way worse after we upgraded KafkaStreams from 2.4 to 2.8.
(even though we disabled warm-up replicas by setting
"acceptable.recovery.lag" to Long.MAX_VALUE)

So my question is: how can we decrease writing to disk frequency in a
KafkaStreams application? Maybe tweaking memtable in the underlying
Rocksdb? Any other suggestions?

Also, any ideas why upgrading KafkaStreams would affect its i/o
performance? Any changes that might have caused an instance to do more
writes?
Thanks
Murilo



Re: Reducing issue

2022-02-02 Thread Bruno Cadonna

Hi Robin,

since this seems to be a ksql question, you will more likely get an 
answer here:


https://forum.confluent.io/c/ksqldb

Best,
Bruno

On 01.02.22 10:03, Robin Helgelin wrote:

Hi,

Working on a small MVP and keep running into a dead end when it comes to 
reducing data.

Began using ksql, which worked very well to start with.

Basically, I have two streams imported via a HTTP source connector. These 
streams are then merged to a single stream and some extra information from the 
original streams are joined to complete the data.

The last step would then be to reduce the information as the original streams 
contains overlapping information. The full reduce should be based on 3 or 4 
different fields and this is where I keep failing. Right now, I do a simple 
GROUP BY field1, field2 and that works as a proof-of-concept but needs to be 
more complex.

I posted on Stack overflow last week, where I also include a workflow to make 
things a bit easier to understand.
https://stackoverflow.com/questions/70834825/kafka-ksql-stuck-in-reducing-stream-table


Regards,
Robin



Re: [kafka-clients] [ANNOUNCE] Apache Kafka 3.0.1

2022-03-14 Thread Bruno Cadonna

Thanks Mickael for driving this release!

Best,
Bruno

On 14.03.22 11:42, Mickael Maison wrote:

The Apache Kafka community is pleased to announce the release for
Apache Kafka 3.0.1

Apache Kafka 3.0.1 is a bugfix release and 29 issues have been fixed
since 3.0.0.

All of the changes in this release can be found in the release notes:
https://www.apache.org/dist/kafka/3.0.1/RELEASE_NOTES.html


You can download the source and binary release (Scala 2.12 and 2.13) from:
https://kafka.apache.org/downloads#3.0.1

---


Apache Kafka is a distributed streaming platform with four core APIs:


** The Producer API allows an application to publish a stream of records to
one or more Kafka topics.

** The Consumer API allows an application to subscribe to one or more
topics and process the stream of records produced to them.

** The Streams API allows an application to act as a stream processor,
consuming an input stream from one or more topics and producing an
output stream to one or more output topics, effectively transforming the
input streams to output streams.

** The Connector API allows building and running reusable producers or
consumers that connect Kafka topics to existing applications or data
systems. For example, a connector to a relational database might
capture every change to a table.


With these APIs, Kafka can be used for two broad classes of application:

** Building real-time streaming data pipelines that reliably get data
between systems or applications.

** Building real-time streaming applications that transform or react
to the streams of data.


Apache Kafka is in use at large and small companies worldwide, including
Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank,
Target, The New York Times, Uber, Yelp, and Zalando, among others.

A big thank you for the following 26 contributors to this release!

A. Sophie Blee-Goldman, Andras Katona, Bruno Cadonna, Chris Egerton,
Cong Ding, David Jacot, dengziming, Edoardo Comar, Ismael Juma, Jason
Gustafson, jiangyuan, Kevin Zhang, Konstantine Karantasis, Lee
Dongjin, Luke Chen, Marc Löhe, Matthias J. Sax, Michael Carter,
Mickael Maison, Oliver Hutchison, Philip Nee, Prateek Agarwal,
prince-mahajan, Rajini Sivaram, Randall Hauch, Walker Carlson

We welcome your help and feedback. For more information on how to
report problems, and to get involved, visit the project website at
https://kafka.apache.org/

Thank you!


Regards,
Mickael Maison



Re: Setting up the CooperativeStickyAssignor in Java

2022-03-17 Thread Bruno Cadonna

Hi Richard,

The group.instance.id config is orthogonal to the partition assignment 
strategy. The group.instance.id is used if you want to have static 
membership which is not related to the partition assignment strategy.


If you think you found a bug, could you please open a JIRA ticket with 
steps to reproduce the bug.


Best,
Bruno

On 16.03.22 10:01, Luke Chen wrote:

Hi Richard,

Right, you are not missing any settings beyond the partition assignment
strategy and the group instance id.
You might need to know from the log that why the rebalance triggered to do
troubleshooting.

Thank you.
Luke

On Wed, Mar 16, 2022 at 3:02 PM Richard Ney  wrote:


Hi Luke,

I did end up with a situation where I had two instances connecting to the
same consumer group and they ended up in a rebalance trade-off. All
partitions kept going back and forth between the two microservice
instances. That was a test case where I'd removed the Group Instance Id
setting to see what would happen. I stabilized that one by reducing it to a
single consumer after 20+ rebalances.

The other issue I'm seeing may be a bug in the Functional Scala `fs2-kafka`
wrapper where I see the partitions cleanly assigned but one or more
instances isn't ingesting. I found out that they recently added support for
the cooperative sticky assignor for the stream recreation since they were
assuming a full revocation of the partitions.

So I basically wanted to make sure I wasn't missing any settings beyond the
partition assignment strategy and the group instance id.

-Richard

-Richard

On Tue, Mar 15, 2022 at 11:27 PM Luke Chen  wrote:


Hi Richard,

To use `CooperativeStickyAssignor`, no other special configuration is
required.

I'm not sure what does `make the rebalance happen cleanly` mean.
Did you find any problem during group rebalance?

Thank you.
Luke

On Wed, Mar 16, 2022 at 1:00 PM Richard Ney 
wrote:


Trying to find a good sample of what consumer settings besides setting

ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
org.apache.kafka.clients.consumer.CooperativeStickyAssignor

is needed to make the rebalance happen cleanly. Unable to find and

decent

documentation or code samples. I have set the Group Instance Id to the

EC2

instance id based on one blog write up I found.

Any help would be appreciated

-Richard









Re: Need Help - getting vulnerability due to Log4j- v1.2.17 jar being used in Kafka_2.11-2.4.0.

2022-03-30 Thread Bruno Cadonna

Hi Sandip,

I just merged the PR https://github.com/apache/kafka/pull/11743 that 
replaces log4j with reload4j. Reload4j will be part of Apache Kafka 
3.2.0 and 3.1.1.


Best,
Bruno

On 30.03.22 04:26, Luke Chen wrote:

Hi Sandip,

We plan to replace log4j with reload4j in v3.2.0 and v3.1.1. (KAFKA-13660
)
And plan to upgrade to log4j2 in v4.0.0.

You can check this discussion thread for more details:
https://lists.apache.org/thread/qo1y3249xldt4cpg6r8zkcq5m1q32bf1

Thank you.
Luke

On Tue, Mar 29, 2022 at 10:18 PM Sandip Bhunia
 wrote:


Dear Team,

We are getting vulnerability due to Log4j- v1.2.17 jar being used in
Kafka_2.11-2.4.0.
We tried to upgrade the same to Kafka_2.13-3.1.0 to remediate
vulnerability due to Log4j- v1.2.17 (obsolete version- Log4j 1.x has
reached End of Life in 2015 and is no longer supported.) but found this
version of Kafka do not use Log4j v2.X

As per your website there is no such information available. Please let us
know when this will get upgraded. Please us know how to get this
vulnerability remediated as we need to upgrade Log4j to v2.x



*Thanks & Regards,*
*Sandip Bhunia*

*Cell: 9932245061 **Em@il*  *: **sandip.bhu...@tcs.com*



*Advance Notice of Holidays: *




=-=-=
Notice: The information contained in this e-mail
message and/or attachments to it may contain
confidential or privileged information. If you are
not the intended recipient, any dissemination, use,
review, distribution, printing or copying of the
information contained in this e-mail message
and/or attachments to it are strictly prohibited. If
you have received this communication in error,
please notify us by reply e-mail or telephone and
immediately and permanently delete the message
and any attachments. Thank you






[VOTE] 3.2.0 RC0

2022-04-15 Thread Bruno Cadonna

Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.2.0.

* log4j 1.x is replaced with reload4j (KAFKA-9366)
* StandardAuthorizer for KRaft (KIP-801)
* Send a hint to the partition leader to recover the partition (KIP-704)
* Top-level error code field in DescribeLogDirsResponse (KIP-784)
* kafka-console-producer writes headers and null values (KIP-798 and 
KIP-810)

* JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800)
* Static membership protocol lets the leader skip assignment (KIP-814)
* Rack-aware standby task assignment in Kafka Streams (KIP-708)
* Interactive Query v2 (KIP-796, KIP-805, and KIP-806)
* Connect APIs list all connector plugins and retrieve their 
configuration (KIP-769)

* TimestampConverter SMT supports different unix time precisions (KIP-808)
* Connect source tasks handle producer exceptions (KIP-779)

Release notes for the 3.2.0 release:
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Monday, April 25, 9am CEST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/javadoc/

* Tag to be voted upon (off 3.2 branch) is the 3.2.0 tag:
https://github.com/apache/kafka/releases/tag/3.2.0-rc0

* Documentation (not yet ported to kafka-site):
https://kafka.apache.org/32/documentation.html

* Protocol:
https://kafka.apache.org/32/protocol.html

* Successful Jenkins builds for the 3.2 branch:
I'll share a link once the builds complete


/**

Thanks,
Bruno


Re: Partnership: Apache Kafka & SantoDigital

2022-04-25 Thread Bruno Cadonna

Hi Amanda,

Apache Kafka is an open source project. You can download, install, and 
use the software for free.


Some companies offer Apache Kafka as a cloud service or as an on-prem 
software. You can find those companies by asking google.


Best,
Bruno



On 25.04.22 17:11, Amanda Jampaulo wrote:

Hi Team, good day!

We are SantoDigital, a Brazilian company and Google's premier partner.

We would like to know more about the Apache Kafka solution. We have a
customer who is interested in your solution.

We want to know how we can make the contract for the solution, what the
price of the list and what the price is for resale and how is the billing?

If this is not the correct channel for this type of question, please, we
kindly ask you to direct our email to the responsible team.

We are looking forward to hearing from you.

Best regards,

Amanda Jampaulo

Sales Operations Manager

+55 11 98444-4509
santodigital.com.br 
[image: LinkedIn]  [image:
Instagram]  [image: YouTube]
 [image: Facebook]
 [image: Twitter]

   



Re: [VOTE] 3.2.0 RC0

2022-04-26 Thread Bruno Cadonna

Hi all,

This is a gently reminder to vote for the first candidate for release of 
Apache Kafka 3.2.0.


I added the 3.2 documentation to the kafka site. That means 
https://kafka.apache.org/32/documentation.html works now.


A successful system tests run can be found here:
https://jenkins.confluent.io/job/system-test-kafka/job/3.2/24/

Thank you to Michal for voting on the release candidate.

Best,
Bruno

On 15.04.22 21:05, Bruno Cadonna wrote:

Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 3.2.0.

* log4j 1.x is replaced with reload4j (KAFKA-9366)
* StandardAuthorizer for KRaft (KIP-801)
* Send a hint to the partition leader to recover the partition (KIP-704)
* Top-level error code field in DescribeLogDirsResponse (KIP-784)
* kafka-console-producer writes headers and null values (KIP-798 and 
KIP-810)

* JoinGroupRequest and LeaveGroupRequest have a reason attached (KIP-800)
* Static membership protocol lets the leader skip assignment (KIP-814)
* Rack-aware standby task assignment in Kafka Streams (KIP-708)
* Interactive Query v2 (KIP-796, KIP-805, and KIP-806)
* Connect APIs list all connector plugins and retrieve their 
configuration (KIP-769)

* TimestampConverter SMT supports different unix time precisions (KIP-808)
* Connect source tasks handle producer exceptions (KIP-779)

Release notes for the 3.2.0 release:
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Monday, April 25, 9am CEST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/javadoc/

* Tag to be voted upon (off 3.2 branch) is the 3.2.0 tag:
https://github.com/apache/kafka/releases/tag/3.2.0-rc0

* Documentation (not yet ported to kafka-site):
https://kafka.apache.org/32/documentation.html

* Protocol:
https://kafka.apache.org/32/protocol.html

* Successful Jenkins builds for the 3.2 branch:
I'll share a link once the builds complete


/**

Thanks,
Bruno


Re: [kafka-clients] Re: [VOTE] 3.2.0 RC0

2022-04-26 Thread Bruno Cadonna

Hi Jun,

Thank you for your message!

Now I see how this issue was introduced in 3.2.0. The fix for the bug 
described in KAFKA-12841 introduced it, right? I initially understood 
that the PR you want to include is the fix for the bug described in 
KAFKA-12841 which dates back to 2.6.


I think that classifies as a regression.

I will abort the voting and create a new release candidate.

Best,
Bruno

On 26.04.22 18:09, 'Jun Rao' via kafka-clients wrote:

Hi, Bruno,

Could we include https://github.com/apache/kafka/pull/12064 
<https://github.com/apache/kafka/pull/12064> in 3.2.0? This fixes an 
issue introduced in 3.2.0 where in some of the error cases, the producer 
interceptor is called twice for the same record.


Thanks,

Jun

On Tue, Apr 26, 2022 at 6:34 AM Bruno Cadonna <mailto:cado...@apache.org>> wrote:


Hi all,

This is a gently reminder to vote for the first candidate for
release of
Apache Kafka 3.2.0.

I added the 3.2 documentation to the kafka site. That means
https://kafka.apache.org/32/documentation.html
<https://kafka.apache.org/32/documentation.html> works now.

A successful system tests run can be found here:
https://jenkins.confluent.io/job/system-test-kafka/job/3.2/24/
<https://jenkins.confluent.io/job/system-test-kafka/job/3.2/24/>

Thank you to Michal for voting on the release candidate.

Best,
    Bruno

On 15.04.22 21:05, Bruno Cadonna wrote:
 > Hello Kafka users, developers and client-developers,
 >
 > This is the first candidate for release of Apache Kafka 3.2.0.
 >
 > * log4j 1.x is replaced with reload4j (KAFKA-9366)
 > * StandardAuthorizer for KRaft (KIP-801)
 > * Send a hint to the partition leader to recover the partition
(KIP-704)
 > * Top-level error code field in DescribeLogDirsResponse (KIP-784)
 > * kafka-console-producer writes headers and null values (KIP-798 and
 > KIP-810)
 > * JoinGroupRequest and LeaveGroupRequest have a reason attached
(KIP-800)
 > * Static membership protocol lets the leader skip assignment
(KIP-814)
 > * Rack-aware standby task assignment in Kafka Streams (KIP-708)
 > * Interactive Query v2 (KIP-796, KIP-805, and KIP-806)
 > * Connect APIs list all connector plugins and retrieve their
 > configuration (KIP-769)
 > * TimestampConverter SMT supports different unix time precisions
(KIP-808)
 > * Connect source tasks handle producer exceptions (KIP-779)
 >
 > Release notes for the 3.2.0 release:
 >
https://home.apache.org/~cadonna/kafka-3.2.0-rc0/RELEASE_NOTES.html
<https://home.apache.org/~cadonna/kafka-3.2.0-rc0/RELEASE_NOTES.html>
 >
 > *** Please download, test and vote by Monday, April 25, 9am CEST
 >
 > Kafka's KEYS file containing PGP keys we use to sign the release:
 > https://kafka.apache.org/KEYS <https://kafka.apache.org/KEYS>
 >
 > * Release artifacts to be voted upon (source and binary):
 > https://home.apache.org/~cadonna/kafka-3.2.0-rc0/
<https://home.apache.org/~cadonna/kafka-3.2.0-rc0/>
 >
 > * Maven artifacts to be voted upon:
 >
https://repository.apache.org/content/groups/staging/org/apache/kafka/
<https://repository.apache.org/content/groups/staging/org/apache/kafka/>
 >
 > * Javadoc:
 > https://home.apache.org/~cadonna/kafka-3.2.0-rc0/javadoc/
<https://home.apache.org/~cadonna/kafka-3.2.0-rc0/javadoc/>
 >
 > * Tag to be voted upon (off 3.2 branch) is the 3.2.0 tag:
 > https://github.com/apache/kafka/releases/tag/3.2.0-rc0
<https://github.com/apache/kafka/releases/tag/3.2.0-rc0>
 >
 > * Documentation (not yet ported to kafka-site):
 > https://kafka.apache.org/32/documentation.html
<https://kafka.apache.org/32/documentation.html>
 >
 > * Protocol:
 > https://kafka.apache.org/32/protocol.html
<https://kafka.apache.org/32/protocol.html>
 >
 > * Successful Jenkins builds for the 3.2 branch:
 > I'll share a link once the builds complete
 >
 >
 > /**
 >
 > Thanks,
 > Bruno

--
You received this message because you are subscribed to the Google 
Groups "kafka-clients" group.
To unsubscribe from this group and stop receiving emails from it, send 
an email to kafka-clients+unsubscr...@googlegroups.com 
<mailto:kafka-clients+unsubscr...@googlegroups.com>.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/kafka-clients/CAFc58G9uguYQ8B%2BPch9kotbUdfeqFABOqyXkm3fr0gwzKxsB0A%40mail.gmail.com 
<https://groups.google.com/d/msgid/kafka-clients/CAFc58G9uguYQ8B%2BPch9kotbUdfeqFABOqyXkm3fr0gwzKxsB0A%40mail.gmail.com?utm_medium=email&utm_source=footer>.


  1   2   >