Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-17 Thread Matthias J. Sax
You cannot add a `Processor`. You can only use `aggregate() / reduce() / 
count()` (which of course will add a pre-defined processor).


`groupByKey()` is really just a "meta operation" that checks if the key 
was changes upstream, and to insert a repartition/shuffle step if necessary.


Thus, if you don't change the upstream key, you can just add a processor 
to `someStream` (groupByKey() would be a no-op anyway).


If you did change the key upstream, you can do 
`someStream.repartition().transform()` to repartition explicitly.



HTH.

On 1/13/24 3:14 AM, Igor Maznitsa wrote:
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
   KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-13 Thread Igor Maznitsa
Thanks a lot for explanation but could you provide a bit more details 
about KGroupedStream? It is just interface and not extends KStream so 
how I can add processor in the case below?

/
  KStream someStream = /
/  someStream /
/     .groupByKey()
/ */how to add processor for resulted grouped stream here ???/*

On 2024-Jan-13 01:22, Matthias J. Sax wrote:
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying 
you  need to use `KTable#groupBy()` (data needs to be repartitioned if 
you change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance 
provide way to not downstream selected events?







Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Matthias J. Sax
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying you 
 need to use `KTable#groupBy()` (data needs to be repartitioned if you 
change the key).


HTH.

-Matthias

On 1/12/24 11:47 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?





[Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Igor Maznitsa

Hello

Is there any way in Kafka Streams API to define processors for KTable 
and KGroupedStream like KStream#transform? How to provide a custom 
processor for KTable or KGroupedStream which could for instance provide 
way to not downstream selected events?



--
Igor Maznitsa
email: rrg4...@gmail.com



Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Guozhang Wang
Great to hear! Always a pleasure.

Guozhang

On Tue, Mar 30, 2021 at 8:04 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> We can confirm the behavior with the 2.7.1 release. Appreciate all the
> help!
>
>
>
> Cheers,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Tuesday, March 30, 2021 at 2:10 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
> the root cause then. Note that this is only an issue with punctuation
> triggered events, where `context.timestamp()` would return 0 (and it is
> fixed in the yet-to-release 2.7.1/2.8.0).
>
> You can consider applying the patch if you could on top of 2.7.0, or wait
> for the new release; OR, if your production code does not actually use
> punctuation to write records to Kafka, then this issue would not actually
> impact you.
>
>
> Guozhang
>
> On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Great to hear we might have found the issue!
> >
> >
> >
> > To answer your question, the changelog record is generated by us calling
> > ‘store.put(key,value)’ from the punctuate callback, which makes sense
> then
> > because the timestamp would be 0 like you saw in your test as well.
> >
> >
> >
> > Best,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang 
> > *Date: *Tuesday, March 30, 2021 at 1:00 PM
> > *To: *Users 
> > *Cc: *Bart Lilje 
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > These are super helpful logs, and I think I'm very close to the root
> cause
> > of it. You see, the written changelog record's timestamp is set to 0
> > (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable
> Kafka
> > server start time (presumingly in 21st century), the retention time would
> > always be breached, and causing the log deletion mechanism to trigger.
> >
> > The timestamp is set with `context.timestamp()` which would use the
> > processing record's timestamp; but myself have seen and fixed a bug (
> > https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp
> was
> > not populated and hence set to 0 if was generated as part of a
> punctuation.
> > So my next key question is: is this changelog record generated, i.e. its
> > put call triggered, from processing an input record, or from a
> punctuation
> > call?
> >
> >
> > Guozhang
> >
> > On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai 
> wrote:
> >
> > > Hi Guozhang,
> > >
> > >
> > >
> > > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > > same behavior as described before with the 2.7 broker where just after
> a
> > > record is written to the changelog topic, the log segment is rolled and
> > > deleted citing that the retention time has passed (the record was
> written
> > > to the state store at ~15:49:
> > >
> > >
> > >
> > > [2021-03-29 15:49:13,757] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > > segments with base offsets [0] due to retention time 25920ms breach
> > > (kafka.log.Log)
> > > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > > offset 1 (kafka.log.ProducerStateManager)
> > > [2021-03-29 15:49:13,763] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > > segment at offset 1 in 5 ms. (kafka.log.Log)
> > > [2021-03-29 15:49:13,764] INFO [Log
> > > partition=test-stream-store-changelog-4,
> > > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling
> segments
> > > for deletion LogSegment(baseOffset=0, size=156,
> > > lastModifiedTime=1617050940118

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Upesh Desai
Hi Guozhang,

We can confirm the behavior with the 2.7.1 release. Appreciate all the help!

Cheers,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang 
Date: Tuesday, March 30, 2021 at 2:10 PM
To: Users 
Cc: Bart Lilje 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
the root cause then. Note that this is only an issue with punctuation
triggered events, where `context.timestamp()` would return 0 (and it is
fixed in the yet-to-release 2.7.1/2.8.0).

You can consider applying the patch if you could on top of 2.7.0, or wait
for the new release; OR, if your production code does not actually use
punctuation to write records to Kafka, then this issue would not actually
impact you.


Guozhang

On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> Great to hear we might have found the issue!
>
>
>
> To answer your question, the changelog record is generated by us calling
> ‘store.put(key,value)’ from the punctuate callback, which makes sense then
> because the timestamp would be 0 like you saw in your test as well.
>
>
>
> Best,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Tuesday, March 30, 2021 at 1:00 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> These are super helpful logs, and I think I'm very close to the root cause
> of it. You see, the written changelog record's timestamp is set to 0
> (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
> server start time (presumingly in 21st century), the retention time would
> always be breached, and causing the log deletion mechanism to trigger.
>
> The timestamp is set with `context.timestamp()` which would use the
> processing record's timestamp; but myself have seen and fixed a bug (
> https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
> not populated and hence set to 0 if was generated as part of a punctuation.
> So my next key question is: is this changelog record generated, i.e. its
> put call triggered, from processing an input record, or from a punctuation
> call?
>
>
> Guozhang
>
> On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > same behavior as described before with the 2.7 broker where just after a
> > record is written to the changelog topic, the log segment is rolled and
> > deleted citing that the retention time has passed (the record was written
> > to the state store at ~15:49:
> >
> >
> >
> > [2021-03-29 15:49:13,757] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > segments with base offsets [0] due to retention time 25920ms breach
> > (kafka.log.Log)
> > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > offset 1 (kafka.log.ProducerStateManager)
> > [2021-03-29 15:49:13,763] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > segment at offset 1 in 5 ms. (kafka.log.Log)
> > [2021-03-29 15:49:13,764] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> > for deletion LogSegment(baseOffset=0, size=156,
> > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> > [2021-03-29 15:49:13,765] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log
> start
> > offset to 1 due to segment deletion (kafka.log.Log)
> >
> >
> >
> > Does this have anything to do with the *largetTime=0* mentioned in the
> > log? This was the first and only record written to the store/changelog.
> Is
> > there anything else we can try to resolve this issue or give us more
> > insight into where this issue could originate from?
> >
> >
> >
> > Thanks,
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgro

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Guozhang Wang
Great, I think https://issues.apache.org/jira/browse/KAFKA-12323 is indeed
the root cause then. Note that this is only an issue with punctuation
triggered events, where `context.timestamp()` would return 0 (and it is
fixed in the yet-to-release 2.7.1/2.8.0).

You can consider applying the patch if you could on top of 2.7.0, or wait
for the new release; OR, if your production code does not actually use
punctuation to write records to Kafka, then this issue would not actually
impact you.


Guozhang

On Tue, Mar 30, 2021 at 11:56 AM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> Great to hear we might have found the issue!
>
>
>
> To answer your question, the changelog record is generated by us calling
> ‘store.put(key,value)’ from the punctuate callback, which makes sense then
> because the timestamp would be 0 like you saw in your test as well.
>
>
>
> Best,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Tuesday, March 30, 2021 at 1:00 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> These are super helpful logs, and I think I'm very close to the root cause
> of it. You see, the written changelog record's timestamp is set to 0
> (i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
> server start time (presumingly in 21st century), the retention time would
> always be breached, and causing the log deletion mechanism to trigger.
>
> The timestamp is set with `context.timestamp()` which would use the
> processing record's timestamp; but myself have seen and fixed a bug (
> https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
> not populated and hence set to 0 if was generated as part of a punctuation.
> So my next key question is: is this changelog record generated, i.e. its
> put call triggered, from processing an input record, or from a punctuation
> call?
>
>
> Guozhang
>
> On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > When testing with a 2.6.1 broker and 2.7 streams application, I see the
> > same behavior as described before with the 2.7 broker where just after a
> > record is written to the changelog topic, the log segment is rolled and
> > deleted citing that the retention time has passed (the record was written
> > to the state store at ~15:49:
> >
> >
> >
> > [2021-03-29 15:49:13,757] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> > segments with base offsets [0] due to retention time 25920ms breach
> > (kafka.log.Log)
> > [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> > partition=test-stream-store-changelog-4] Writing producer snapshot at
> > offset 1 (kafka.log.ProducerStateManager)
> > [2021-03-29 15:49:13,763] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> > segment at offset 1 in 5 ms. (kafka.log.Log)
> > [2021-03-29 15:49:13,764] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> > for deletion LogSegment(baseOffset=0, size=156,
> > lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> > [2021-03-29 15:49:13,765] INFO [Log
> > partition=test-stream-store-changelog-4,
> > dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log
> start
> > offset to 1 due to segment deletion (kafka.log.Log)
> >
> >
> >
> > Does this have anything to do with the *largetTime=0* mentioned in the
> > log? This was the first and only record written to the store/changelog.
> Is
> > there anything else we can try to resolve this issue or give us more
> > insight into where this issue could originate from?
> >
> >
> >
> > Thanks,
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Upesh Desai 
> > *Date: *Thursday, March 25, 2021 at 6:46 PM
> > *To: *users@kafka.apache.org 
> > *Cc: *Bart Lilje 
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > We have not tried 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Upesh Desai
Hi Guozhang,

Great to hear we might have found the issue!

To answer your question, the changelog record is generated by us calling 
‘store.put(key,value)’ from the punctuate callback, which makes sense then 
because the timestamp would be 0 like you saw in your test as well.

Best,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang 
Date: Tuesday, March 30, 2021 at 1:00 PM
To: Users 
Cc: Bart Lilje 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
Hello Upesh,

These are super helpful logs, and I think I'm very close to the root cause
of it. You see, the written changelog record's timestamp is set to 0
(i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
server start time (presumingly in 21st century), the retention time would
always be breached, and causing the log deletion mechanism to trigger.

The timestamp is set with `context.timestamp()` which would use the
processing record's timestamp; but myself have seen and fixed a bug (
https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
not populated and hence set to 0 if was generated as part of a punctuation.
So my next key question is: is this changelog record generated, i.e. its
put call triggered, from processing an input record, or from a punctuation
call?


Guozhang

On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> When testing with a 2.6.1 broker and 2.7 streams application, I see the
> same behavior as described before with the 2.7 broker where just after a
> record is written to the changelog topic, the log segment is rolled and
> deleted citing that the retention time has passed (the record was written
> to the state store at ~15:49:
>
>
>
> [2021-03-29 15:49:13,757] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> segments with base offsets [0] due to retention time 25920ms breach
> (kafka.log.Log)
> [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> partition=test-stream-store-changelog-4] Writing producer snapshot at
> offset 1 (kafka.log.ProducerStateManager)
> [2021-03-29 15:49:13,763] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> segment at offset 1 in 5 ms. (kafka.log.Log)
> [2021-03-29 15:49:13,764] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> for deletion LogSegment(baseOffset=0, size=156,
> lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> [2021-03-29 15:49:13,765] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start
> offset to 1 due to segment deletion (kafka.log.Log)
>
>
>
> Does this have anything to do with the *largetTime=0* mentioned in the
> log? This was the first and only record written to the store/changelog. Is
> there anything else we can try to resolve this issue or give us more
> insight into where this issue could originate from?
>
>
>
> Thanks,
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Upesh Desai 
> *Date: *Thursday, March 25, 2021 at 6:46 PM
> *To: *users@kafka.apache.org 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> We have not tried running 2.6 brokers and 2.7 client, I will try and get
> back to you.
>
>
>
> We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> The shutdowns and restarts of the stream app are clean each time.
>
>
>
> I see in the broker logs certain lines indicating that the log segment is
> being rolled and deleted, but I don’t see how or why this should be
> happening when the records were just written. See the log line snippets
> included in the attached file. Initially 8 records are added (offsets 0-8),
> followed by a single record (offset 9). They are rolled and deleted almost
> instantly.
>
>
>
> Best,
>
> Upesh
>
>
>
> *Upesh Desai**​*
>
> * | *
>
> Senior Software Developer
>
>  |
>
> *ude...@itrsgroup.com *
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Thursday, March 25, 2021 at 6:31 PM
> *To: *Users 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> BTW, yes that indicates the record in the chang

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-30 Thread Guozhang Wang
Hello Upesh,

These are super helpful logs, and I think I'm very close to the root cause
of it. You see, the written changelog record's timestamp is set to 0
(i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
server start time (presumingly in 21st century), the retention time would
always be breached, and causing the log deletion mechanism to trigger.

The timestamp is set with `context.timestamp()` which would use the
processing record's timestamp; but myself have seen and fixed a bug (
https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
not populated and hence set to 0 if was generated as part of a punctuation.
So my next key question is: is this changelog record generated, i.e. its
put call triggered, from processing an input record, or from a punctuation
call?


Guozhang

On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> When testing with a 2.6.1 broker and 2.7 streams application, I see the
> same behavior as described before with the 2.7 broker where just after a
> record is written to the changelog topic, the log segment is rolled and
> deleted citing that the retention time has passed (the record was written
> to the state store at ~15:49:
>
>
>
> [2021-03-29 15:49:13,757] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> segments with base offsets [0] due to retention time 25920ms breach
> (kafka.log.Log)
> [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> partition=test-stream-store-changelog-4] Writing producer snapshot at
> offset 1 (kafka.log.ProducerStateManager)
> [2021-03-29 15:49:13,763] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> segment at offset 1 in 5 ms. (kafka.log.Log)
> [2021-03-29 15:49:13,764] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> for deletion LogSegment(baseOffset=0, size=156,
> lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> [2021-03-29 15:49:13,765] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start
> offset to 1 due to segment deletion (kafka.log.Log)
>
>
>
> Does this have anything to do with the *largetTime=0* mentioned in the
> log? This was the first and only record written to the store/changelog. Is
> there anything else we can try to resolve this issue or give us more
> insight into where this issue could originate from?
>
>
>
> Thanks,
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Upesh Desai 
> *Date: *Thursday, March 25, 2021 at 6:46 PM
> *To: *users@kafka.apache.org 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> We have not tried running 2.6 brokers and 2.7 client, I will try and get
> back to you.
>
>
>
> We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> The shutdowns and restarts of the stream app are clean each time.
>
>
>
> I see in the broker logs certain lines indicating that the log segment is
> being rolled and deleted, but I don’t see how or why this should be
> happening when the records were just written. See the log line snippets
> included in the attached file. Initially 8 records are added (offsets 0-8),
> followed by a single record (offset 9). They are rolled and deleted almost
> instantly.
>
>
>
> Best,
>
> Upesh
>
>
>
> *Upesh Desai**​*
>
> * | *
>
> Senior Software Developer
>
>  |
>
> *ude...@itrsgroup.com *
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Thursday, March 25, 2021 at 6:31 PM
> *To: *Users 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> BTW, yes that indicates the record in the changelog was already truncated
> (logically). But since we only physically truncate logs by segments, which
> is 1GB by default, it should still be physically on the log. Are you
> enabling EOS on Streams, and when you shutdown the streams app, is that a
> clean shutdown?
>
> On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang  wrote:
>
> > That's indeed weird.
> >
> > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > with 2.7?
> >
> > On Thu, Mar 25, 2021 at 2:34 PM Upes

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-29 Thread Upesh Desai
Hi Guozhang,

When testing with a 2.6.1 broker and 2.7 streams application, I see the same 
behavior as described before with the 2.7 broker where just after a record is 
written to the changelog topic, the log segment is rolled and deleted citing 
that the retention time has passed (the record was written to the state store 
at ~15:49:

[2021-03-29 15:49:13,757] INFO [Log partition=test-stream-store-changelog-4, 
dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable segments 
with base offsets [0] due to retention time 25920ms breach (kafka.log.Log)
[2021-03-29 15:49:13,761] INFO [ProducerStateManager 
partition=test-stream-store-changelog-4] Writing producer snapshot at offset 1 
(kafka.log.ProducerStateManager)
[2021-03-29 15:49:13,763] INFO [Log partition=test-stream-store-changelog-4, 
dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log segment at 
offset 1 in 5 ms. (kafka.log.Log)
[2021-03-29 15:49:13,764] INFO [Log partition=test-stream-store-changelog-4, 
dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments for 
deletion LogSegment(baseOffset=0, size=156, lastModifiedTime=1617050940118, 
largestTime=0) (kafka.log.Log)
[2021-03-29 15:49:13,765] INFO [Log partition=test-stream-store-changelog-4, 
dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start 
offset to 1 due to segment deletion (kafka.log.Log)

Does this have anything to do with the largetTime=0 mentioned in the log? This 
was the first and only record written to the store/changelog. Is there anything 
else we can try to resolve this issue or give us more insight into where this 
issue could originate from?

Thanks,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Upesh Desai 
Date: Thursday, March 25, 2021 at 6:46 PM
To: users@kafka.apache.org 
Cc: Bart Lilje 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
We have not tried running 2.6 brokers and 2.7 client, I will try and get back 
to you.

We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE. The 
shutdowns and restarts of the stream app are clean each time.

I see in the broker logs certain lines indicating that the log segment is being 
rolled and deleted, but I don’t see how or why this should be happening when 
the records were just written. See the log line snippets included in the 
attached file. Initially 8 records are added (offsets 0-8), followed by a 
single record (offset 9). They are rolled and deleted almost instantly.

Best,
Upesh

Upesh Desai​
 |
Senior Software Developer
 |
ude...@itrsgroup.com<mailto:ude...@itrsgroup.com>
www.itrsgroup.com<https://www.itrsgroup.com/>
[cid:image001.png@01D724B4.AF7F0790]<https://www.itrsgroup.com/>
From: Guozhang Wang 
Date: Thursday, March 25, 2021 at 6:31 PM
To: Users 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang  wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai  wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>1. I have the record added to the state store, stopped the
>>application, and check the earliest and latest offsets via the command 
>> line
>>tools. This shows that the earliest offset is 1, and the latest offset is
>>also 1. Does this mean that the record has been marked for deletion
>>already? My retention.ms config is set to 3 days (25920 ms), so
>>it should not be marked for deletion if added a couple minutes prior?
>>2. Following the above, this makes sense as well. When logging the
>>starting offset, it is not 0, but rather 1:
>>
>>*topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
>> 
>&

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Upesh Desai
We have not tried running 2.6 brokers and 2.7 client, I will try and get back 
to you.

We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE. The 
shutdowns and restarts of the stream app are clean each time.

I see in the broker logs certain lines indicating that the log segment is being 
rolled and deleted, but I don’t see how or why this should be happening when 
the records were just written. See the log line snippets included in the 
attached file. Initially 8 records are added (offsets 0-8), followed by a 
single record (offset 9). They are rolled and deleted almost instantly.

Best,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang 
Date: Thursday, March 25, 2021 at 6:31 PM
To: Users 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang  wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai  wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>1. I have the record added to the state store, stopped the
>>application, and check the earliest and latest offsets via the command 
>> line
>>tools. This shows that the earliest offset is 1, and the latest offset is
>>also 1. Does this mean that the record has been marked for deletion
>>already? My retention.ms config is set to 3 days (25920 ms), so
>>it should not be marked for deletion if added a couple minutes prior?
>>2. Following the above, this makes sense as well. When logging the
>>starting offset, it is not 0, but rather 1:
>>
>>*topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
>> 
>> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> <https://www.itrsgroup.com/>
>>
>> *From: *Guozhang Wang 
>> *Date: *Thursday, March 25, 2021 at 4:01 PM
>> *To: *Users 
>> *Cc: *Bart Lilje 
>> *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> changelog topics
>>
>> Hello Upesh,
>>
>> Could you confirm a few more things for me:
>>
>> 1. After you stopped the application, and wiped out the state dir; check
>> if
>> the corresponding changelog topic has one record indeed at offset 0 ---
>> this can be done via the admin#listOffsets (get the earliest and latest
>> offset, which should be 0 and 1 correspondingly).
>> 2. After you resumed the application, check from which starting position
>> we
>> are restoring the changelog --- this can be done via implementing the
>> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
>> restoreEndOffset);`, should be 0
>>
>> If both of them check out fine as expected, then from the code I think
>> bufferedLimitIndex should be updated to 1.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> >
>> > Here are some of the answers to your questions I see during my testing:
>> >
>> >
>> >
>> >1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
>> my
>> >test 1 record had been added to the store. However the numRecords
>> variable
>> >is still set to 0
>> >2. For that particular test, `hasRestoredToEnd()` indeed returns true
>> >as well. But it is confusing since the store is actually empty / that
>> >record I added does not exist in the store when trying to check for
>> it.
>> >3. N/A
>> >
>> >
>> >
>> > A little 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Guozhang Wang
BTW, yes that indicates the record in the changelog was already truncated
(logically). But since we only physically truncate logs by segments, which
is 1GB by default, it should still be physically on the log. Are you
enabling EOS on Streams, and when you shutdown the streams app, is that a
clean shutdown?

On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang  wrote:

> That's indeed weird.
>
> Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> with 2.7?
>
> On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai  wrote:
>
>> Hello Guozhang,
>>
>>
>>
>> I have tried your suggestions with an inMemoryStore FYI and seen the
>> following:
>>
>>
>>
>>1. I have the record added to the state store, stopped the
>>application, and check the earliest and latest offsets via the command 
>> line
>>tools. This shows that the earliest offset is 1, and the latest offset is
>>also 1. Does this mean that the record has been marked for deletion
>>already? My retention.ms config is set to 3 days (25920 ms), so
>>it should not be marked for deletion if added a couple minutes prior?
>>2. Following the above, this makes sense as well. When logging the
>>starting offset, it is not 0, but rather 1:
>>
>>*topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>>end offset: 1*
>>
>>
>>
>> I also confirmed different behavior when we change the changelog topic
>> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
>> see this issue when the changelog is just set to compact. We also confirmed
>> that this does not happen when we run everything on Kafka version 2.6.
>>
>>
>>
>> Thanks,
>>
>> Upesh
>>
>>
>> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
>> 
>> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> <https://www.itrsgroup.com/>
>>
>> *From: *Guozhang Wang 
>> *Date: *Thursday, March 25, 2021 at 4:01 PM
>> *To: *Users 
>> *Cc: *Bart Lilje 
>> *Subject: *Re: Kafka Streams Processor API state stores not restored via
>> changelog topics
>>
>> Hello Upesh,
>>
>> Could you confirm a few more things for me:
>>
>> 1. After you stopped the application, and wiped out the state dir; check
>> if
>> the corresponding changelog topic has one record indeed at offset 0 ---
>> this can be done via the admin#listOffsets (get the earliest and latest
>> offset, which should be 0 and 1 correspondingly).
>> 2. After you resumed the application, check from which starting position
>> we
>> are restoring the changelog --- this can be done via implementing the
>> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
>> restoreEndOffset);`, should be 0
>>
>> If both of them check out fine as expected, then from the code I think
>> bufferedLimitIndex should be updated to 1.
>>
>>
>> Guozhang
>>
>> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:
>>
>> > Hi Guozhang,
>> >
>> >
>> >
>> > Here are some of the answers to your questions I see during my testing:
>> >
>> >
>> >
>> >1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
>> my
>> >test 1 record had been added to the store. However the numRecords
>> variable
>> >is still set to 0
>> >2. For that particular test, `hasRestoredToEnd()` indeed returns true
>> >as well. But it is confusing since the store is actually empty / that
>> >record I added does not exist in the store when trying to check for
>> it.
>> >3. N/A
>> >
>> >
>> >
>> > A little more information, the records we add to this store/changelog
>> are
>> > of type  where the value is always set to an empty
>> byte
>> > array `new byte[0]`. A couple other variations I have tried are setting
>> to
>> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>> >
>> >
>> >
>> > Hope this gives a little more clarity and hope to hear from you soon.
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Upesh
>> >
>> >
>> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
>> > 
>> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
>> > <https://www.itrsgroup.com/>
>> >
>> > *From: *Guozhang Wang 
>> 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Guozhang Wang
That's indeed weird.

Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
with 2.7?

On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai  wrote:

> Hello Guozhang,
>
>
>
> I have tried your suggestions with an inMemoryStore FYI and seen the
> following:
>
>
>
>1. I have the record added to the state store, stopped the
>application, and check the earliest and latest offsets via the command line
>tools. This shows that the earliest offset is 1, and the latest offset is
>also 1. Does this mean that the record has been marked for deletion
>already? My retention.ms config is set to 3 days (25920 ms), so it
>should not be marked for deletion if added a couple minutes prior?
>2. Following the above, this makes sense as well. When logging the
>starting offset, it is not 0, but rather 1:
>
>*topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
>end offset: 1*
>
>
>
> I also confirmed different behavior when we change the changelog topic
> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT see
> this issue when the changelog is just set to compact. We also confirmed
> that this does not happen when we run everything on Kafka version 2.6.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Thursday, March 25, 2021 at 4:01 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Could you confirm a few more things for me:
>
> 1. After you stopped the application, and wiped out the state dir; check if
> the corresponding changelog topic has one record indeed at offset 0 ---
> this can be done via the admin#listOffsets (get the earliest and latest
> offset, which should be 0 and 1 correspondingly).
> 2. After you resumed the application, check from which starting position we
> are restoring the changelog --- this can be done via implementing the
> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> restoreEndOffset);`, should be 0
>
> If both of them check out fine as expected, then from the code I think
> bufferedLimitIndex should be updated to 1.
>
>
> Guozhang
>
> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:
>
> > Hi Guozhang,
> >
> >
> >
> > Here are some of the answers to your questions I see during my testing:
> >
> >
> >
> >1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
> >test 1 record had been added to the store. However the numRecords
> variable
> >is still set to 0
> >2. For that particular test, `hasRestoredToEnd()` indeed returns true
> >as well. But it is confusing since the store is actually empty / that
> >record I added does not exist in the store when trying to check for
> it.
> >3. N/A
> >
> >
> >
> > A little more information, the records we add to this store/changelog are
> > of type  where the value is always set to an empty byte
> > array `new byte[0]`. A couple other variations I have tried are setting
> to
> > a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
> >
> >
> >
> > Hope this gives a little more clarity and hope to hear from you soon.
> >
> >
> >
> > Thanks,
> >
> > Upesh
> >
> >
> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> > 
> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> > <https://www.itrsgroup.com/>
> >
> > *From: *Guozhang Wang 
> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> > *To: *Users 
> > *Cc: *Bart Lilje 
> > *Subject: *Re: Kafka Streams Processor API state stores not restored via
> > changelog topics
> >
> > Hello Upesh,
> >
> > Thanks for the detailed report. I looked through the code and tried to
> > reproduce the issue, but so far have not succeeded. I think I may need
> some
> > further information from you to help my further investigation.
> >
> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> > an issue, as long as it could still be bumped later (i.e. it is possible
> > that the restore consumer has not fetched data yet). What's key though,
> is
> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> > be created with null value, and then been

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Upesh Desai
Hello Guozhang,

I have tried your suggestions with an inMemoryStore FYI and seen the following:


  1.  I have the record added to the state store, stopped the application, and 
check the earliest and latest offsets via the command line tools. This shows 
that the earliest offset is 1, and the latest offset is also 1. Does this mean 
that the record has been marked for deletion already? My retention.ms config is 
set to 3 days (25920 ms), so it should not be marked for deletion if added 
a couple minutes prior?
  2.  Following the above, this makes sense as well. When logging the starting 
offset, it is not 0, but rather 1:

topic: streamapp-teststore-changelog, partition: 4, start offset: 1, end 
offset: 1

I also confirmed different behavior when we change the changelog topic cleanup 
policy from “compact,delete” to just “compact”. We DO NOT see this issue when 
the changelog is just set to compact. We also confirmed that this does not 
happen when we run everything on Kafka version 2.6.

Thanks,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang 
Date: Thursday, March 25, 2021 at 4:01 PM
To: Users 
Cc: Bart Lilje 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>test 1 record had been added to the store. However the numRecords variable
>is still set to 0
>2. For that particular test, `hasRestoredToEnd()` indeed returns true
>as well. But it is confusing since the store is actually empty / that
>record I added does not exist in the store when trying to check for it.
>3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type  where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai  wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
&g

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-25 Thread Guozhang Wang
Hello Upesh,

Could you confirm a few more things for me:

1. After you stopped the application, and wiped out the state dir; check if
the corresponding changelog topic has one record indeed at offset 0 ---
this can be done via the admin#listOffsets (get the earliest and latest
offset, which should be 0 and 1 correspondingly).
2. After you resumed the application, check from which starting position we
are restoring the changelog --- this can be done via implementing the
`stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
restoreEndOffset);`, should be 0

If both of them check out fine as expected, then from the code I think
bufferedLimitIndex should be updated to 1.


Guozhang

On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai  wrote:

> Hi Guozhang,
>
>
>
> Here are some of the answers to your questions I see during my testing:
>
>
>
>1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my
>test 1 record had been added to the store. However the numRecords variable
>is still set to 0
>2. For that particular test, `hasRestoredToEnd()` indeed returns true
>as well. But it is confusing since the store is actually empty / that
>record I added does not exist in the store when trying to check for it.
>3. N/A
>
>
>
> A little more information, the records we add to this store/changelog are
> of type  where the value is always set to an empty byte
> array `new byte[0]`. A couple other variations I have tried are setting to
> a non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.
>
>
>
> Hope this gives a little more clarity and hope to hear from you soon.
>
>
>
> Thanks,
>
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang 
> *Date: *Wednesday, March 24, 2021 at 1:37 PM
> *To: *Users 
> *Cc: *Bart Lilje 
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> Hello Upesh,
>
> Thanks for the detailed report. I looked through the code and tried to
> reproduce the issue, but so far have not succeeded. I think I may need some
> further information from you to help my further investigation.
>
> 1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
> an issue, as long as it could still be bumped later (i.e. it is possible
> that the restore consumer has not fetched data yet). What's key though, is
> to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
> be created with null value, and then been initialized once. ChangelogReader
> would stop restoring once the current offset has reached beyond this value
> or if this value itself is 0.
>
> 2) If `restoreEndOffset` is initialized to a non-zero value, then check if
> the restoration indeed completed without applying any records, this is
> determined as `hasRestoredToEnd()` returning true.
>
> 3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
> top of my head I can only think of that the consumer's end offset request
> gets the response with 0, indicating the changelog is now empty.
>
>
> Guozhang
>
>
> On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai  wrote:
>
> > Hi all,
> >
> >
> >
> > Our team think we discovered a bug over the weekend withing the Kafka
> > Streams / Processor API. We are running 2.7.0.
> >
> >
> >
> > When configuring a state store backed by a changelog topic with the
> > cleanup policy configuration set to “compact,delete”:
> >
> >
> >
> > final StoreBuilder> store = Stores
> >   .*keyValueStoreBuilder*(
> > Stores.*persistentKeyValueStore*(*STORE_ID*),
> > kSerde,
> > vSerde)
> >   .withLoggingEnabled(Map.*of*(
> > *RETENTION_MS_CONFIG*, "9000"),
> > *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >   .withCachingEnabled();
> >
> >
> >
> > Here is how we reproduced the problem:
> >
> >1. Records are written to the state store, and subsequently produced
> >to the changelog topic.
> >2. Store streams application
> >3. Delete state.dir directory
> >4. Restart streams application
> >5. Confirm state store is initialized empty with no records restored
> >from changelog
> >
> >
> >
> > We see this problem with both in-memory and RocksDB backed state stores.
> > For persistent state store, if the streams application is restarted
> without
> > the state dir being deleted, the 

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-24 Thread Upesh Desai
Hi Guozhang,

Here are some of the answers to your questions I see during my testing:


  1.  ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in my test 
1 record had been added to the store. However the numRecords variable is still 
set to 0
  2.  For that particular test, `hasRestoredToEnd()` indeed returns true as 
well. But it is confusing since the store is actually empty / that record I 
added does not exist in the store when trying to check for it.
  3.  N/A

A little more information, the records we add to this store/changelog are of 
type  where the value is always set to an empty byte array 
`new byte[0]`. A couple other variations I have tried are setting to a 
non-empty static byte array such as `new byte[1]` or `new byte[]{1}`.

Hope this gives a little more clarity and hope to hear from you soon.

Thanks,
Upesh


Upesh Desai | Senior Software Developer | ude...@itrsgroup.com
www.itrsgroup.com
From: Guozhang Wang 
Date: Wednesday, March 24, 2021 at 1:37 PM
To: Users 
Cc: Bart Lilje 
Subject: Re: Kafka Streams Processor API state stores not restored via 
changelog topics
Hello Upesh,

Thanks for the detailed report. I looked through the code and tried to
reproduce the issue, but so far have not succeeded. I think I may need some
further information from you to help my further investigation.

1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
an issue, as long as it could still be bumped later (i.e. it is possible
that the restore consumer has not fetched data yet). What's key though, is
to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
be created with null value, and then been initialized once. ChangelogReader
would stop restoring once the current offset has reached beyond this value
or if this value itself is 0.

2) If `restoreEndOffset` is initialized to a non-zero value, then check if
the restoration indeed completed without applying any records, this is
determined as `hasRestoredToEnd()` returning true.

3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
top of my head I can only think of that the consumer's end offset request
gets the response with 0, indicating the changelog is now empty.


Guozhang


On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai  wrote:

> Hi all,
>
>
>
> Our team think we discovered a bug over the weekend withing the Kafka
> Streams / Processor API. We are running 2.7.0.
>
>
>
> When configuring a state store backed by a changelog topic with the
> cleanup policy configuration set to “compact,delete”:
>
>
>
> final StoreBuilder> store = Stores
>   .*keyValueStoreBuilder*(
> Stores.*persistentKeyValueStore*(*STORE_ID*),
> kSerde,
> vSerde)
>   .withLoggingEnabled(Map.*of*(
> *RETENTION_MS_CONFIG*, "9000"),
> *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>   .withCachingEnabled();
>
>
>
> Here is how we reproduced the problem:
>
>1. Records are written to the state store, and subsequently produced
>to the changelog topic.
>2. Store streams application
>3. Delete state.dir directory
>4. Restart streams application
>5. Confirm state store is initialized empty with no records restored
>from changelog
>
>
>
> We see this problem with both in-memory and RocksDB backed state stores.
> For persistent state store, if the streams application is restarted without
> the state dir being deleted, the application still does not “restore” from
> the changelog, but records are still seen in the state store.
>
>
>
> When rolling back to 2.6, we do not see this issue.
>
>
>
> Doing some debugging in the source code, in the StoreChangelogReader class
> I found that the number of records to restore is always 0 based on the
> below snippet:
>
>
>
> private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
> final ProcessorStateManager stateManager = changelogMetadata.stateManager;
> final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
> final TopicPartition partition = storeMetadata.changelogPartition();
> final String storeName = storeMetadata.store().name();
> final int numRecords = changelogMetadata.bufferedLimitIndex;
>
>
>
> Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>
>
>
> My question to you all is, 1) Is this expected behavior? 2) If not, is it
> a bug?
>
>
>
> Hope to get some clarity, and thanks in advance!
>
>
>
> Best,
> Upesh
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *ude...@itrsgroup.com* 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility fo

Re: Kafka Streams Processor API state stores not restored via changelog topics

2021-03-24 Thread Guozhang Wang
Hello Upesh,

Thanks for the detailed report. I looked through the code and tried to
reproduce the issue, but so far have not succeeded. I think I may need some
further information from you to help my further investigation.

1) The `bufferedLimitIndex == 0` itself does not necessarily mean there's
an issue, as long as it could still be bumped later (i.e. it is possible
that the restore consumer has not fetched data yet). What's key though, is
to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it would
be created with null value, and then been initialized once. ChangelogReader
would stop restoring once the current offset has reached beyond this value
or if this value itself is 0.

2) If `restoreEndOffset` is initialized to a non-zero value, then check if
the restoration indeed completed without applying any records, this is
determined as `hasRestoredToEnd()` returning true.

3) If `restoreEndOffset` is initialized to 0, then we need to check why: on
top of my head I can only think of that the consumer's end offset request
gets the response with 0, indicating the changelog is now empty.


Guozhang


On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai  wrote:

> Hi all,
>
>
>
> Our team think we discovered a bug over the weekend withing the Kafka
> Streams / Processor API. We are running 2.7.0.
>
>
>
> When configuring a state store backed by a changelog topic with the
> cleanup policy configuration set to “compact,delete”:
>
>
>
> final StoreBuilder> store = Stores
>   .*keyValueStoreBuilder*(
> Stores.*persistentKeyValueStore*(*STORE_ID*),
> kSerde,
> vSerde)
>   .withLoggingEnabled(Map.*of*(
> *RETENTION_MS_CONFIG*, "9000"),
> *CLEANUP_POLICY_CONFIG*, "compact,delete"))
>   .withCachingEnabled();
>
>
>
> Here is how we reproduced the problem:
>
>1. Records are written to the state store, and subsequently produced
>to the changelog topic.
>2. Store streams application
>3. Delete state.dir directory
>4. Restart streams application
>5. Confirm state store is initialized empty with no records restored
>from changelog
>
>
>
> We see this problem with both in-memory and RocksDB backed state stores.
> For persistent state store, if the streams application is restarted without
> the state dir being deleted, the application still does not “restore” from
> the changelog, but records are still seen in the state store.
>
>
>
> When rolling back to 2.6, we do not see this issue.
>
>
>
> Doing some debugging in the source code, in the StoreChangelogReader class
> I found that the number of records to restore is always 0 based on the
> below snippet:
>
>
>
> private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
> final ProcessorStateManager stateManager = changelogMetadata.stateManager;
> final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
> final TopicPartition partition = storeMetadata.changelogPartition();
> final String storeName = storeMetadata.store().name();
> final int numRecords = changelogMetadata.bufferedLimitIndex;
>
>
>
> Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
>
>
>
> My question to you all is, 1) Is this expected behavior? 2) If not, is it
> a bug?
>
>
>
> Hope to get some clarity, and thanks in advance!
>
>
>
> Best,
> Upesh
> <https://www.itrsgroup.com/>
> Upesh Desai​
> Senior Software Developer
> *ude...@itrsgroup.com* 
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> Internet communications are not secure and therefore the ITRS Group does
> not accept legal responsibility for the contents of this message. Any view
> or opinions presented are solely those of the author and do not necessarily
> represent those of the ITRS Group unless otherwise specifically stated.
> [itrs.email.signature]
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang


Kafka Streams Processor API state stores not restored via changelog topics

2021-03-23 Thread Upesh Desai
Hi all,

Our team think we discovered a bug over the weekend withing the Kafka Streams / 
Processor API. We are running 2.7.0.

When configuring a state store backed by a changelog topic with the cleanup 
policy configuration set to “compact,delete”:

final StoreBuilder> store = Stores
  .keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_ID),
kSerde,
vSerde)
  .withLoggingEnabled(Map.of(
RETENTION_MS_CONFIG, "9000"),
CLEANUP_POLICY_CONFIG, "compact,delete"))
  .withCachingEnabled();

Here is how we reproduced the problem:

  1.  Records are written to the state store, and subsequently produced to the 
changelog topic.
  2.  Store streams application
  3.  Delete state.dir directory
  4.  Restart streams application
  5.  Confirm state store is initialized empty with no records restored from 
changelog

We see this problem with both in-memory and RocksDB backed state stores. For 
persistent state store, if the streams application is restarted without the 
state dir being deleted, the application still does not “restore” from the 
changelog, but records are still seen in the state store.

When rolling back to 2.6, we do not see this issue.

Doing some debugging in the source code, in the StoreChangelogReader class I 
found that the number of records to restore is always 0 based on the below 
snippet:


private void restoreChangelog(final ChangelogMetadata changelogMetadata) {
final ProcessorStateManager stateManager = changelogMetadata.stateManager;
final StateStoreMetadata storeMetadata = changelogMetadata.storeMetadata;
final TopicPartition partition = storeMetadata.changelogPartition();
final String storeName = storeMetadata.store().name();
final int numRecords = changelogMetadata.bufferedLimitIndex;

Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.

My question to you all is, 1) Is this expected behavior? 2) If not, is it a bug?

Hope to get some clarity, and thanks in advance!

Best,
Upesh

Upesh Desai
Senior Software Developer
ude...@itrsgroup.com
www.itrsgroup.com
Internet communications are not secure and therefore the ITRS Group does not 
accept legal responsibility for the contents of this message. Any view or 
opinions presented are solely those of the author and do not necessarily 
represent those of the ITRS Group unless otherwise specifically stated.
[itrs.email.signature]

Disclaimer

The information contained in this communication from the sender is 
confidential. It is intended solely for use by the recipient and others 
authorized to receive it. If you are not the recipient, you are hereby notified 
that any disclosure, copying, distribution or taking action in relation of the 
contents of this information is strictly prohibited and may be unlawful.

This email has been scanned for viruses and malware, and may have been 
automatically archived by Mimecast Ltd, an innovator in Software as a Service 
(SaaS) for business. Providing a safer and more useful place for your human 
generated data. Specializing in; Security, archiving and compliance. To find 
out more visit the Mimecast website.


Re: Kafka Streams & Processor API

2019-08-05 Thread Boyang Chen
Hey Navneeth,

thank you for your interest in trying out Kafka Streams! Normally we will
redirect new folks to the stream FAQ
 first in case you
haven't checked it out. For details to your question:

1. Joining 2 topics using processor API (we call PAPI), you have to create
2 stream nodes, each of which needs to read in one join topic and
materialize data into local state store. At the same time, they should
cross reference other's state store to perform the joining in #process()
upon receiving new records. You could check out KStreamKStreamJoin and
KStreamImpl#join for more details.

2. If you are concerned about delaying due to one partition, I would
recommend you try out wall clock time which advances based on real-world
time. Check out here


3. Not sure what specific read pattern you are looking for, but if you only
need to read single key-value, we have interactive query support

built in (which potentially solves your remote referral question). If you
want to access the entire state store for debugging, you could opt in to
dump the result like here
.
And there is no state store across multiple machines, each individual state
store belongs to one stream task, and they are isolated even on local.

Hope this helps!

Boyang

On Sat, Aug 3, 2019 at 11:31 PM Navneeth Krishnan 
wrote:

> Hi All,
>
> I'm new to kafka streams and I'm trying to model my use case that is
> currently written on flink to kafka streams. I have couple of questions.
>
> - How can I join two kafka topics using the processor API? I have data
> stream and change stream which needs to be joined based on a key.
>
> - I read about the "STREAM_TIME" punctuation and it seems unless all
> partitions have new records there will be not be an advance in time. Is
> there a way to overcome this?
>
> - Is there a way to read the state store data through an API externally? If
> so then how does it know where the state resides and if the state is across
> multiple machines how does that work?
>
> Thanks for all your help.
>


Kafka Streams & Processor API

2019-08-04 Thread Navneeth Krishnan
Hi All,

I'm new to kafka streams and I'm trying to model my use case that is
currently written on flink to kafka streams. I have couple of questions.

- How can I join two kafka topics using the processor API? I have data
stream and change stream which needs to be joined based on a key.

- I read about the "STREAM_TIME" punctuation and it seems unless all
partitions have new records there will be not be an advance in time. Is
there a way to overcome this?

- Is there a way to read the state store data through an API externally? If
so then how does it know where the state resides and if the state is across
multiple machines how does that work?

Thanks for all your help.


Re: Kafka Streams processor node metrics process rate with multiple stream threads

2018-07-17 Thread Guozhang Wang
Thanks Sam! Please feel free to assign the ticket to yourself and I will
review your PR if you created one:

https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest

On Tue, Jul 17, 2018 at 6:29 PM, Sam Lendle  wrote:

> https://issues.apache.org/jira/browse/KAFKA-7176
>
> If I have a change I will give trunk a try.
>
> On 7/16/18, 2:14 PM, "Guozhang Wang"  wrote:
>
> Hmm.. this seems new to me. Checked on the source code it seems right
> to me.
>
> Could you try out the latest trunk (build from source code) and see if
> it
> is the same issue for you?
>
> > In addition to that, though, I also see state store metrics for tasks
> that have been migrated to another instance, and their values continue
> to
> be updated, even after seeing messages in the logs indicating that
> local
> state for those tasks has been cleaned. Is this also fixed, or a
> separate
> issue?
>
> This may be an issue that is not yet resolved, I'd need to double
> check. At
> the mean time, could you create a JIRA for it?
>
>
> Guozhang
>
>
> On Thu, Jul 12, 2018 at 4:04 PM, Sam Lendle 
> wrote:
>
> > Ah great, thanks Gouzhang.
> >
> > I also noticed a similar issue with state store metrics, where rate
> > metrics for each thread/task appear to be the total rate across all
> > threads/tasks on that instance.
> >
> > In addition to that, though, I also see state store metrics for
> tasks that
> > have been migrated to another instance, and their values continue to
> be
> > updated, even after seeing messages in the logs indicating that
> local state
> > for those tasks has been cleaned. Is this also fixed, or a separate
> issue?
> >
> > Best,
> > Sam
> >
> > On 7/11/18, 10:51 PM, "Guozhang Wang"  wrote:
> >
> > Hello Sam,
> >
> > It is a known issue that should have been fixed in 2.0, the
> correlated
> > fix
> > has also been cherry-picked to the 1.1.1 bug fix release as well:
> >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.
> > com_apache_kafka_pull_5277=DwIFaQ=gFTBenQ7Vj71sUi1A4CkFnmPzqwDo0
> > 7QsHw-JRepxyw=BNCekDhngyXB6C2Ag7PIfHotiuqjAVwLOZLQHB7fyOM=-
> > PxNeRIE8RN79eewJpZdqKjdn7hBegA5u-pJ208prdA=gJdWWHIgT-
> > uqkFvjwFCQNXvC4C6fvar7pHqXXcHg2KE=
> >
> >
> > Guozhang
> >
> > On Wed, Jul 11, 2018 at 11:42 AM, Sam Lendle <
> slen...@pandora.com>
> > wrote:
> >
> > > Hello!
> > >
> > > Using kafka-streams 1.1.0, I noticed when I sum the process
> rate
> > metric
> > > for a given processor node, the rate is many times higher than
> the
> > number
> > > of incoming messages. Digging further, it looks like the rate
> metric
> > > associated with each thread in a given application instance is
> > always the
> > > same, and if I average by instance and then sum the rates, I
> recover
> > the
> > > incoming message rate.  So it looks like the rate metric for
> each
> > stream
> > > thread is actually the reporting the rate for all threads on
> the
> > instance.
> > >
> > > Is this a known issue, or am I misusing the metric? I’m not
> sure if
> > this
> > > affects other metrics, but it does look like the average
> latency
> > metric is
> > > identical for all threads on the same instance, so I suspect
> it does.
> > >
> > > Thanks,
> > > Sam
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
>
>
> --
> -- Guozhang
>
>
>


-- 
-- Guozhang


Re: Kafka Streams processor node metrics process rate with multiple stream threads

2018-07-17 Thread Sam Lendle
https://issues.apache.org/jira/browse/KAFKA-7176

If I have a change I will give trunk a try.

On 7/16/18, 2:14 PM, "Guozhang Wang"  wrote:

Hmm.. this seems new to me. Checked on the source code it seems right to me.

Could you try out the latest trunk (build from source code) and see if it
is the same issue for you?

> In addition to that, though, I also see state store metrics for tasks
that have been migrated to another instance, and their values continue to
be updated, even after seeing messages in the logs indicating that local
state for those tasks has been cleaned. Is this also fixed, or a separate
issue?

This may be an issue that is not yet resolved, I'd need to double check. At
the mean time, could you create a JIRA for it?


Guozhang


On Thu, Jul 12, 2018 at 4:04 PM, Sam Lendle  wrote:

> Ah great, thanks Gouzhang.
>
> I also noticed a similar issue with state store metrics, where rate
> metrics for each thread/task appear to be the total rate across all
> threads/tasks on that instance.
>
> In addition to that, though, I also see state store metrics for tasks that
> have been migrated to another instance, and their values continue to be
> updated, even after seeing messages in the logs indicating that local 
state
> for those tasks has been cleaned. Is this also fixed, or a separate issue?
>
> Best,
> Sam
>
> On 7/11/18, 10:51 PM, "Guozhang Wang"  wrote:
>
> Hello Sam,
>
> It is a known issue that should have been fixed in 2.0, the correlated
> fix
> has also been cherry-picked to the 1.1.1 bug fix release as well:
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.
> com_apache_kafka_pull_5277=DwIFaQ=gFTBenQ7Vj71sUi1A4CkFnmPzqwDo0
> 7QsHw-JRepxyw=BNCekDhngyXB6C2Ag7PIfHotiuqjAVwLOZLQHB7fyOM=-
> PxNeRIE8RN79eewJpZdqKjdn7hBegA5u-pJ208prdA=gJdWWHIgT-
> uqkFvjwFCQNXvC4C6fvar7pHqXXcHg2KE=
>
>
> Guozhang
>
> On Wed, Jul 11, 2018 at 11:42 AM, Sam Lendle 
> wrote:
>
> > Hello!
> >
> > Using kafka-streams 1.1.0, I noticed when I sum the process rate
> metric
> > for a given processor node, the rate is many times higher than the
> number
> > of incoming messages. Digging further, it looks like the rate metric
> > associated with each thread in a given application instance is
> always the
> > same, and if I average by instance and then sum the rates, I recover
> the
> > incoming message rate.  So it looks like the rate metric for each
> stream
> > thread is actually the reporting the rate for all threads on the
> instance.
> >
> > Is this a known issue, or am I misusing the metric? I’m not sure if
> this
> > affects other metrics, but it does look like the average latency
> metric is
> > identical for all threads on the same instance, so I suspect it 
does.
> >
> > Thanks,
> > Sam
> >
>
>
>
> --
> -- Guozhang
>
>
>


-- 
-- Guozhang




Re: Kafka Streams processor node metrics process rate with multiple stream threads

2018-07-16 Thread Guozhang Wang
Hmm.. this seems new to me. Checked on the source code it seems right to me.

Could you try out the latest trunk (build from source code) and see if it
is the same issue for you?

> In addition to that, though, I also see state store metrics for tasks
that have been migrated to another instance, and their values continue to
be updated, even after seeing messages in the logs indicating that local
state for those tasks has been cleaned. Is this also fixed, or a separate
issue?

This may be an issue that is not yet resolved, I'd need to double check. At
the mean time, could you create a JIRA for it?


Guozhang


On Thu, Jul 12, 2018 at 4:04 PM, Sam Lendle  wrote:

> Ah great, thanks Gouzhang.
>
> I also noticed a similar issue with state store metrics, where rate
> metrics for each thread/task appear to be the total rate across all
> threads/tasks on that instance.
>
> In addition to that, though, I also see state store metrics for tasks that
> have been migrated to another instance, and their values continue to be
> updated, even after seeing messages in the logs indicating that local state
> for those tasks has been cleaned. Is this also fixed, or a separate issue?
>
> Best,
> Sam
>
> On 7/11/18, 10:51 PM, "Guozhang Wang"  wrote:
>
> Hello Sam,
>
> It is a known issue that should have been fixed in 2.0, the correlated
> fix
> has also been cherry-picked to the 1.1.1 bug fix release as well:
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__github.
> com_apache_kafka_pull_5277=DwIFaQ=gFTBenQ7Vj71sUi1A4CkFnmPzqwDo0
> 7QsHw-JRepxyw=BNCekDhngyXB6C2Ag7PIfHotiuqjAVwLOZLQHB7fyOM=-
> PxNeRIE8RN79eewJpZdqKjdn7hBegA5u-pJ208prdA=gJdWWHIgT-
> uqkFvjwFCQNXvC4C6fvar7pHqXXcHg2KE=
>
>
> Guozhang
>
> On Wed, Jul 11, 2018 at 11:42 AM, Sam Lendle 
> wrote:
>
> > Hello!
> >
> > Using kafka-streams 1.1.0, I noticed when I sum the process rate
> metric
> > for a given processor node, the rate is many times higher than the
> number
> > of incoming messages. Digging further, it looks like the rate metric
> > associated with each thread in a given application instance is
> always the
> > same, and if I average by instance and then sum the rates, I recover
> the
> > incoming message rate.  So it looks like the rate metric for each
> stream
> > thread is actually the reporting the rate for all threads on the
> instance.
> >
> > Is this a known issue, or am I misusing the metric? I’m not sure if
> this
> > affects other metrics, but it does look like the average latency
> metric is
> > identical for all threads on the same instance, so I suspect it does.
> >
> > Thanks,
> > Sam
> >
>
>
>
> --
> -- Guozhang
>
>
>


-- 
-- Guozhang


Re: Kafka Streams processor node metrics process rate with multiple stream threads

2018-07-12 Thread Sam Lendle
Ah great, thanks Gouzhang.

I also noticed a similar issue with state store metrics, where rate metrics for 
each thread/task appear to be the total rate across all threads/tasks on that 
instance.

In addition to that, though, I also see state store metrics for tasks that have 
been migrated to another instance, and their values continue to be updated, 
even after seeing messages in the logs indicating that local state for those 
tasks has been cleaned. Is this also fixed, or a separate issue?

Best,
Sam

On 7/11/18, 10:51 PM, "Guozhang Wang"  wrote:

Hello Sam,

It is a known issue that should have been fixed in 2.0, the correlated fix
has also been cherry-picked to the 1.1.1 bug fix release as well:


https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_kafka_pull_5277=DwIFaQ=gFTBenQ7Vj71sUi1A4CkFnmPzqwDo07QsHw-JRepxyw=BNCekDhngyXB6C2Ag7PIfHotiuqjAVwLOZLQHB7fyOM=-PxNeRIE8RN79eewJpZdqKjdn7hBegA5u-pJ208prdA=gJdWWHIgT-uqkFvjwFCQNXvC4C6fvar7pHqXXcHg2KE=


Guozhang

On Wed, Jul 11, 2018 at 11:42 AM, Sam Lendle  wrote:

> Hello!
>
> Using kafka-streams 1.1.0, I noticed when I sum the process rate metric
> for a given processor node, the rate is many times higher than the number
> of incoming messages. Digging further, it looks like the rate metric
> associated with each thread in a given application instance is always the
> same, and if I average by instance and then sum the rates, I recover the
> incoming message rate.  So it looks like the rate metric for each stream
> thread is actually the reporting the rate for all threads on the instance.
>
> Is this a known issue, or am I misusing the metric? I’m not sure if this
> affects other metrics, but it does look like the average latency metric is
> identical for all threads on the same instance, so I suspect it does.
>
> Thanks,
> Sam
>



-- 
-- Guozhang




Kafka Streams processor node metrics process rate with multiple stream threads

2018-07-11 Thread Sam Lendle
Hello!

Using kafka-streams 1.1.0, I noticed when I sum the process rate metric for a 
given processor node, the rate is many times higher than the number of incoming 
messages. Digging further, it looks like the rate metric associated with each 
thread in a given application instance is always the same, and if I average by 
instance and then sum the rates, I recover the incoming message rate.  So it 
looks like the rate metric for each stream thread is actually the reporting the 
rate for all threads on the instance.

Is this a known issue, or am I misusing the metric? I’m not sure if this 
affects other metrics, but it does look like the average latency metric is 
identical for all threads on the same instance, so I suspect it does.

Thanks,
Sam


Re: Kafka streams Processor life cycle behavior of close()

2016-10-08 Thread Srikanth
Tnx! Looks like fix is already in for 0.10.1.0

On Tue, Oct 4, 2016 at 6:18 PM, Guozhang Wang  wrote:

> Created https://issues.apache.org/jira/browse/KAFKA-4253 for this issue.
>
>
> Guozhang
>
> On Tue, Oct 4, 2016 at 3:08 PM, Guozhang Wang  wrote:
>
> > Hello Srikanth,
> >
> > We close the underlying clients before closing the state manager (hence
> > the states) because for example we need to make sure producer's sent
> > records have all been acked before the state manager records the
> changelog
> > sent offsets as end offsets. This is kind of chicken-and-egg problem, and
> > we may be able to re-order the shutting down process in the future with
> > some added shutdown hooks.
> >
> > As of now, there is not a perfect solution to your scenario, and I would
> > like to suggest checking if producer's own batching mechanism is good
> > enough so you do not need to do this in the streams client layer.
> >
> >
> > Guozhang
> >
> > On Sat, Oct 1, 2016 at 2:20 PM, Srikanth  wrote:
> >
> >> Hello,
> >>
> >> I'm testing out a WriteToSinkProcessor() that batches records before
> >> writing it to a sink.
> >> The actual commit to sink happens in punctuate(). I also wanted to
> commit
> >> in close().
> >> Idea here is, during a regular shutdown, we'll commit all records and
> >> ideally stop with an empty state.
> >> My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
> >> delete written keys from KV store.
> >>
> >> I get this exception when closing though. It looks like the kafka
> producer
> >> is closed before the changelog topic is updated after close().
> >> Should the producer be closed after all tasks and processors are closed?
> >>
> >> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
> >> processor instance
> >> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
> >> stream tasks in thread [StreamThread-1]:
> >> java.lang.IllegalStateException: Cannot send after the producer is
> >> closed.
> >> at
> >> org.apache.kafka.clients.producer.internals.RecordAccumulato
> >> r.append(RecordAccumulator.java:173)
> >> at
> >> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka
> >> Producer.java:467)
> >> at
> >> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr
> >> oducer.java:430)
> >> at
> >> org.apache.kafka.streams.processor.internals.RecordCollector
> >> .send(RecordCollector.java:84)
> >> at
> >> org.apache.kafka.streams.processor.internals.RecordCollector
> >> .send(RecordCollector.java:71)
> >> at
> >> org.apache.kafka.streams.state.internals.StoreChangeLogger.
> >> logChange(StoreChangeLogger.java:108)
> >> at
> >> org.apache.kafka.streams.state.internals.InMemoryKeyValueLog
> >> gedStore.flush(InMemoryKeyValueLoggedStore.java:161)
> >> at
> >> org.apache.kafka.streams.state.internals.MeteredKeyValueStor
> >> e.flush(MeteredKeyValueStore.java:165)
> >> at
> >> org.apache.kafka.streams.processor.internals.ProcessorStateM
> >> anager.close(ProcessorStateManager.java:343)
> >> at
> >> org.apache.kafka.streams.processor.internals.AbstractTask.
> >> close(AbstractTask.java:112)
> >> at
> >> org.apache.kafka.streams.processor.internals.StreamTask.
> >> close(StreamTask.java:317)
> >>
> >> Srikanth
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>


Re: Kafka streams Processor life cycle behavior of close()

2016-10-04 Thread Guozhang Wang
Created https://issues.apache.org/jira/browse/KAFKA-4253 for this issue.


Guozhang

On Tue, Oct 4, 2016 at 3:08 PM, Guozhang Wang  wrote:

> Hello Srikanth,
>
> We close the underlying clients before closing the state manager (hence
> the states) because for example we need to make sure producer's sent
> records have all been acked before the state manager records the changelog
> sent offsets as end offsets. This is kind of chicken-and-egg problem, and
> we may be able to re-order the shutting down process in the future with
> some added shutdown hooks.
>
> As of now, there is not a perfect solution to your scenario, and I would
> like to suggest checking if producer's own batching mechanism is good
> enough so you do not need to do this in the streams client layer.
>
>
> Guozhang
>
> On Sat, Oct 1, 2016 at 2:20 PM, Srikanth  wrote:
>
>> Hello,
>>
>> I'm testing out a WriteToSinkProcessor() that batches records before
>> writing it to a sink.
>> The actual commit to sink happens in punctuate(). I also wanted to commit
>> in close().
>> Idea here is, during a regular shutdown, we'll commit all records and
>> ideally stop with an empty state.
>> My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
>> delete written keys from KV store.
>>
>> I get this exception when closing though. It looks like the kafka producer
>> is closed before the changelog topic is updated after close().
>> Should the producer be closed after all tasks and processors are closed?
>>
>> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
>> processor instance
>> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
>> stream tasks in thread [StreamThread-1]:
>> java.lang.IllegalStateException: Cannot send after the producer is
>> closed.
>> at
>> org.apache.kafka.clients.producer.internals.RecordAccumulato
>> r.append(RecordAccumulator.java:173)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.doSend(Kafka
>> Producer.java:467)
>> at
>> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaPr
>> oducer.java:430)
>> at
>> org.apache.kafka.streams.processor.internals.RecordCollector
>> .send(RecordCollector.java:84)
>> at
>> org.apache.kafka.streams.processor.internals.RecordCollector
>> .send(RecordCollector.java:71)
>> at
>> org.apache.kafka.streams.state.internals.StoreChangeLogger.
>> logChange(StoreChangeLogger.java:108)
>> at
>> org.apache.kafka.streams.state.internals.InMemoryKeyValueLog
>> gedStore.flush(InMemoryKeyValueLoggedStore.java:161)
>> at
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStor
>> e.flush(MeteredKeyValueStore.java:165)
>> at
>> org.apache.kafka.streams.processor.internals.ProcessorStateM
>> anager.close(ProcessorStateManager.java:343)
>> at
>> org.apache.kafka.streams.processor.internals.AbstractTask.
>> close(AbstractTask.java:112)
>> at
>> org.apache.kafka.streams.processor.internals.StreamTask.
>> close(StreamTask.java:317)
>>
>> Srikanth
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang


Re: Kafka streams Processor life cycle behavior of close()

2016-10-04 Thread Guozhang Wang
Hello Srikanth,

We close the underlying clients before closing the state manager (hence the
states) because for example we need to make sure producer's sent records
have all been acked before the state manager records the changelog sent
offsets as end offsets. This is kind of chicken-and-egg problem, and we may
be able to re-order the shutting down process in the future with some added
shutdown hooks.

As of now, there is not a perfect solution to your scenario, and I would
like to suggest checking if producer's own batching mechanism is good
enough so you do not need to do this in the streams client layer.


Guozhang

On Sat, Oct 1, 2016 at 2:20 PM, Srikanth  wrote:

> Hello,
>
> I'm testing out a WriteToSinkProcessor() that batches records before
> writing it to a sink.
> The actual commit to sink happens in punctuate(). I also wanted to commit
> in close().
> Idea here is, during a regular shutdown, we'll commit all records and
> ideally stop with an empty state.
> My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
> delete written keys from KV store.
>
> I get this exception when closing though. It looks like the kafka producer
> is closed before the changelog topic is updated after close().
> Should the producer be closed after all tasks and processors are closed?
>
> 16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
> processor instance
> 16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
> stream tasks in thread [StreamThread-1]:
> java.lang.IllegalStateException: Cannot send after the producer is closed.
> at
> org.apache.kafka.clients.producer.internals.RecordAccumulator.append(
> RecordAccumulator.java:173)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(
> KafkaProducer.java:467)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(
> KafkaProducer.java:430)
> at
> org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:84)
> at
> org.apache.kafka.streams.processor.internals.RecordCollector.send(
> RecordCollector.java:71)
> at
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(
> StoreChangeLogger.java:108)
> at
> org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.
> flush(InMemoryKeyValueLoggedStore.java:161)
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(
> MeteredKeyValueStore.java:165)
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(
> ProcessorStateManager.java:343)
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.close(
> AbstractTask.java:112)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.
> java:317)
>
> Srikanth
>



-- 
-- Guozhang


Kafka streams Processor life cycle behavior of close()

2016-10-01 Thread Srikanth
Hello,

I'm testing out a WriteToSinkProcessor() that batches records before
writing it to a sink.
The actual commit to sink happens in punctuate(). I also wanted to commit
in close().
Idea here is, during a regular shutdown, we'll commit all records and
ideally stop with an empty state.
My commit() process is 3 step 1) Read from KV store 2) write to sink 3)
delete written keys from KV store.

I get this exception when closing though. It looks like the kafka producer
is closed before the changelog topic is updated after close().
Should the producer be closed after all tasks and processors are closed?

16/10/01 17:01:15 INFO StreamThread-1 WriteToSinkProcessor: Closing
processor instance
16/10/01 17:01:16 ERROR StreamThread-1 StreamThread: Failed to remove
stream tasks in thread [StreamThread-1]:
java.lang.IllegalStateException: Cannot send after the producer is closed.
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:173)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:467)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:84)
at
org.apache.kafka.streams.processor.internals.RecordCollector.send(RecordCollector.java:71)
at
org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:108)
at
org.apache.kafka.streams.state.internals.InMemoryKeyValueLoggedStore.flush(InMemoryKeyValueLoggedStore.java:161)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.flush(MeteredKeyValueStore.java:165)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.close(ProcessorStateManager.java:343)
at
org.apache.kafka.streams.processor.internals.AbstractTask.close(AbstractTask.java:112)
at
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:317)

Srikanth


Re: Kafka Streams / Processor

2016-05-30 Thread Matthias J. Sax
2) I am not sure if I understand correctly
   * punctuation is independent from committing (ie, you cannot use it
to flush)
   * if you need to align writes with commits you can either use a
KStream/KTable or need to register a state (see StateStore.java)

5) The application goes down -- neither process() nor punctuate() should
throw!


-Matthias

On 05/27/2016 03:52 AM, Tobias Adamson wrote:
> Thank you. 
> Some more follow-up questions
> 
> 1) great, will do some tests
> 
> 2) if auto commit is used how do we prevent a commit happening when an error 
> happens in processing. Basically our scenario is that we build up aggregation 
> contexts for specific keys (these are a bit special so most probably can't 
> use KTables) and we would then on each punctuate call want to save these 
> contexts to our external systems. Once saved we would commit our offset. 
> However if the progress is committed before punctuate and we have an error 
> saving we could end up with the offset being ahead of our saved progress. 
> 
> 3) great
> 
> 4) great
> 
> 5) What happens if a process/punctuate call throws a RuntimeException?
> 
> Regards
> Toby
> 
> 
> 
>> On 27 May 2016, at 1:32 AM, Matthias J. Sax  wrote:
>>
>> Hi Toby,
>>
>> 1) I am not an expert for RocksDB, but I don't see a problem with larger
>> objects.
>>
>> 2) I assume, by "guaranteed" you mean that the commit is performed when
>> the call return. In this case, no. It only sets a flag to commit at the
>> next earliest point in time possible. Ie, you can trigger commits in
>> between the regular commit interval that is configured via
>> "commit.interval.ms"
>>
>> 3) Yes.
>>
>> 4) Yes.
>>
>> -Matthias
>>
>>
>> On 05/26/2016 02:36 PM, Tobias Adamson wrote:
>>> Hi
>>> We have a scenario where we could benefit from the new API’s instead of our 
>>> in house ones.
>>> However we have a couple of questions
>>>
>>> 1. Is it feasible to save 2-3MB size values in the RocksDBStorage?
>>> 2. When is the offset committed as processed when using a custom Processor, 
>>> is it when you call commit on the context and is the commit guaranteed?
>>> 3. Is it ok to put values in to the KVStore in the punctuate method?
>>> 4. Is the punctuate method run by the same thread as the process method?
>>>
>>>
>>> Regards
>>> Toby
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams / Processor

2016-05-26 Thread Tobias Adamson
Thank you. 
Some more follow-up questions

1) great, will do some tests

2) if auto commit is used how do we prevent a commit happening when an error 
happens in processing. Basically our scenario is that we build up aggregation 
contexts for specific keys (these are a bit special so most probably can't use 
KTables) and we would then on each punctuate call want to save these contexts 
to our external systems. Once saved we would commit our offset. However if the 
progress is committed before punctuate and we have an error saving we could end 
up with the offset being ahead of our saved progress. 

3) great

4) great

5) What happens if a process/punctuate call throws a RuntimeException?

Regards
Toby



> On 27 May 2016, at 1:32 AM, Matthias J. Sax  wrote:
> 
> Hi Toby,
> 
> 1) I am not an expert for RocksDB, but I don't see a problem with larger
> objects.
> 
> 2) I assume, by "guaranteed" you mean that the commit is performed when
> the call return. In this case, no. It only sets a flag to commit at the
> next earliest point in time possible. Ie, you can trigger commits in
> between the regular commit interval that is configured via
> "commit.interval.ms"
> 
> 3) Yes.
> 
> 4) Yes.
> 
> -Matthias
> 
> 
> On 05/26/2016 02:36 PM, Tobias Adamson wrote:
>> Hi
>> We have a scenario where we could benefit from the new API’s instead of our 
>> in house ones.
>> However we have a couple of questions
>> 
>> 1. Is it feasible to save 2-3MB size values in the RocksDBStorage?
>> 2. When is the offset committed as processed when using a custom Processor, 
>> is it when you call commit on the context and is the commit guaranteed?
>> 3. Is it ok to put values in to the KVStore in the punctuate method?
>> 4. Is the punctuate method run by the same thread as the process method?
>> 
>> 
>> Regards
>> Toby
>> 
> 



Re: Kafka Streams / Processor

2016-05-26 Thread Matthias J. Sax
Hi Toby,

1) I am not an expert for RocksDB, but I don't see a problem with larger
objects.

2) I assume, by "guaranteed" you mean that the commit is performed when
the call return. In this case, no. It only sets a flag to commit at the
next earliest point in time possible. Ie, you can trigger commits in
between the regular commit interval that is configured via
"commit.interval.ms"

3) Yes.

4) Yes.

-Matthias


On 05/26/2016 02:36 PM, Tobias Adamson wrote:
> Hi
> We have a scenario where we could benefit from the new API’s instead of our 
> in house ones.
> However we have a couple of questions
> 
> 1. Is it feasible to save 2-3MB size values in the RocksDBStorage?
> 2. When is the offset committed as processed when using a custom Processor, 
> is it when you call commit on the context and is the commit guaranteed?
> 3. Is it ok to put values in to the KVStore in the punctuate method?
> 4. Is the punctuate method run by the same thread as the process method?
> 
> 
> Regards
> Toby
> 



signature.asc
Description: OpenPGP digital signature


Kafka Streams / Processor

2016-05-26 Thread Tobias Adamson
Hi
We have a scenario where we could benefit from the new API’s instead of our in 
house ones.
However we have a couple of questions

1. Is it feasible to save 2-3MB size values in the RocksDBStorage?
2. When is the offset committed as processed when using a custom Processor, is 
it when you call commit on the context and is the commit guaranteed?
3. Is it ok to put values in to the KVStore in the punctuate method?
4. Is the punctuate method run by the same thread as the process method?


Regards
Toby