Hi Alessandro,

I had a look at your code. Regarding your question whether you use the
SpecificAvroSerde correctly, take a look at the following documentation:

https://docs.confluent.io/current/streams/developer-guide/datatypes.html

I haven't had the time yet to take a closer look at your problems with the
aggregation. I will have a look next week.

Have a nice weekend,
Bruno

On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> So I've started with a new app with the archetype:generate as in
> https://kafka.apache.org/22/documentation/streams/tutorial
>
> I've pushed a sample repo here: https://github.com/alex88/kafka-test
> The avro schemas are a Metric with 2 fields: timestamp and production and a
> MetricList with a list of records (Metric) to be able to manually do the
> aggregation.
> Right now the aggregation is simple just for the purpose of the sample repo
> and to easily see if we're getting wrong values.
>
> What I wanted to achieve is:
>  - have a custom generator that generates 1 message per second with
> production = 1 with 1 ore more separate message keys which in my case are
> the sensor IDs generating the data
>  - a filter that removes out of order messages by having a state that
> stores key (sensorID) -> last timestamp
>  - a window operation that for this example just sums the values in each 10
> seconds windows
>
> To show where I'm having issues I've setup multiple branches for the repo:
>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is the
> one I had initially "Failed to flush state store
> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using
>
> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is the
> one after I've tried to solve above problem with the materializer (maybe
> the SpecificAvroSerde is wrong?)
>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* after
> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new
> SpecificAvroSerde<>()))) everything seems to be working, if you let both
> the producer and stream running, you'll see that the stream receives 10
> messages (with the timestamp incrementing 1 second for each message) like
> this:
>
> S1 with filtered metric{"timestamp": 160000, "production": 1}
> S1 with filtered metric{"timestamp": 161000, "production": 1}
> S1 with filtered metric{"timestamp": 162000, "production": 1}
> S1 with filtered metric{"timestamp": 163000, "production": 1}
> S1 with filtered metric{"timestamp": 164000, "production": 1}
> S1 with filtered metric{"timestamp": 165000, "production": 1}
> S1 with filtered metric{"timestamp": 166000, "production": 1}
> S1 with filtered metric{"timestamp": 167000, "production": 1}
> S1 with filtered metric{"timestamp": 168000, "production": 1}
> S1 with filtered metric{"timestamp": 169000, "production": 1}
>
> and at the 10 seconds interval something like:
>
> S1 with computed metric {"timestamp": 160000, "production": 10}
> S1 with computed metric {"timestamp": 170000, "production": 10}
> S1 with computed metric {"timestamp": 180000, "production": 10}
>
> and so on...
> Now there are two problems, after stopping and restarting the stream
> processor (by sending SIGINT via IntelliJ since I start the class main with
> it) it happens:
>  - sometimes the aggregated count is wrong, if I have it start windowing
> for 7 seconds (e.g. seconds 11-17), restart the stream, after restart it
> might just emit a value for the new 3 missing seconds (seconds 18-20) and
> the aggregated value is 3 not 10
>  - sometimes the window outputs twice, in the example where I restart the
> stream processor I might get as output
>
> S1 with filtered metric{"timestamp": 154000, "production": 1}
> S1 with computed metric {"timestamp": 160000, "production": 5}
> S1 with filtered metric{"timestamp": 155000, "production": 1}
> S1 with filtered metric{"timestamp": 156000, "production": 1}
> S1 with filtered metric{"timestamp": 157000, "production": 1}
> S1 with filtered metric{"timestamp": 158000, "production": 1}
> S1 with filtered metric{"timestamp": 159000, "production": 1}
> S1 with filtered metric{"timestamp": 160000, "production": 1}
> S1 with filtered metric{"timestamp": 161000, "production": 1}
> S1 with computed metric {"timestamp": 160000, "production": 10}
> S1 with filtered metric{"timestamp": 162000, "production": 1}
>
> as you can see, window for timestamp 160000 is duplicated
>
> Is this because the window state isn't persisted across restarts?
> My ultimate goal is to have the window part emit only once and resume
> processing across restarts, while avoiding processing out of order data
> (that's the purpose of the TimestampIncrementalFilter)
>
> Thank you in advance
>
> --
> Alessandro Tagliapietra
>
>
> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
> tagliapietra.alessan...@gmail.com> wrote:
>
> > Hi Bruno,
> >
> > I'm using the confluent docker images 5.2.1, so kafka 2.2.
> > Anyway I'll try to make a small reproduction repo with all the different
> > cases soon.
> >
> > Thank you
> >
> > --
> > Alessandro Tagliapietra
> >
> >
> > On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io>
> wrote:
> >
> >> Hi Alessandro,
> >>
> >> What version of Kafka do you use?
> >>
> >> Could you please give a more detailed example for the issues with the
> two
> >> keys you see?
> >>
> >> Could the following bug be related to the duplicates you see?
> >>
> >>
> >>
> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
> >>
> >> How do you restart the processor?
> >>
> >> Best,
> >> Bruno
> >>
> >> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
> >> tagliapietra.alessan...@gmail.com> wrote:
> >>
> >> > Thank you Bruno,
> >> >
> >> > I'll look into those, however average is just a simple thing I'm
> trying
> >> > right now just to get an initial windowing flow working.
> >> > In the future I'll probably still need the actual values for other
> >> > calculations. We won't have more than 60 elements per window for sure.
> >> >
> >> > So far to not manually serialize/deserialize the array list I've
> >> created an
> >> > Avro model with an array field containing the values.
> >> > I had issues with suppress as explained here
> >> >
> >> >
> >> >
> >>
> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
> >> >
> >> > but I got that working.
> >> > So far everything seems to be working, except a couple things:
> >> >  - if I generate data with 1 key, I correctly get a value each 10
> >> seconds,
> >> > if I later start generating data with another key (while key 1 is
> still
> >> > generating) the windowing emits a value only after the timestamp of
> key
> >> 2
> >> > reaches the last generated window
> >> >  - while generating data, if I restart the processor as soon as it
> >> starts
> >> > it sometimes generates 2 aggregates for the same window even if I'm
> >> using
> >> > the suppress
> >> >
> >> > Anyway, I'll look into your link and try to find out the cause of
> these
> >> > issues, probably starting from scratch with a simpler example
> >> >
> >> > Thank you for your help!
> >> >
> >> > --
> >> > Alessandro Tagliapietra
> >> >
> >> > On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io>
> >> wrote:
> >> >
> >> > > Hi Alessandro,
> >> > >
> >> > > Have a look at this Kafka Usage Pattern for computing averages
> without
> >> > > using an ArrayList.
> >> > >
> >> > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
> >> > > ?
> >> > >
> >> > > The advantages of this pattern over the ArrayList approach is the
> >> reduced
> >> > > space needed to compute the aggregate. Note that you will still need
> >> to
> >> > > implement a SerDe. However, the SerDe should be a bit easier to
> >> implement
> >> > > than a SerDe for an ArrayList.
> >> > >
> >> > > Hope that helps.
> >> > >
> >> > > Best,
> >> > > Bruno
> >> > >
> >> > > On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
> >> > > tagliapietra.alessan...@gmail.com> wrote:
> >> > >
> >> > > > Sorry but it seemed harder than I thought,
> >> > > >
> >> > > > to have the custom aggregation working I need to get an ArrayList
> of
> >> > all
> >> > > > the values in the window, so far my aggregate DSL method creates
> an
> >> > > > ArrayList on the initializer and adds each value to the list in
> the
> >> > > > aggregator.
> >> > > > Then I think I'll have to provide a serder to change the output
> >> type of
> >> > > > that method.
> >> > > > I was looking at
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
> >> > > > but
> >> > > > that seems more towards a list of longs and already uses
> longSerde.
> >> > > > I'm currently trying to implement another avro model that has a
> >> field
> >> > of
> >> > > > type array so I can use the regular avro serializer to implement
> >> this.
> >> > > > Should I create my own serdes instead or is this the right way?
> >> > > >
> >> > > > Thank you in advance
> >> > > >
> >> > > > --
> >> > > > Alessandro Tagliapietra
> >> > > >
> >> > > > On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
> >> > > > tagliapietra.alessan...@gmail.com> wrote:
> >> > > >
> >> > > > > Thank you Bruno and Matthias,
> >> > > > >
> >> > > > > I've modified the transformer to implement the
> >> > ValueTransformerWithKey
> >> > > > > interface and everything is working fine.
> >> > > > > I've now to window the data and manually aggregate each window
> >> data
> >> > > since
> >> > > > > I've to do some averages and sum of differences.
> >> > > > > So far I've just having some issues with message types since I'm
> >> > > changing
> >> > > > > the data type when aggregating the window but I think it's an
> easy
> >> > > > problem.
> >> > > > >
> >> > > > > Thank you again
> >> > > > > Best
> >> > > > >
> >> > > > > --
> >> > > > > Alessandro Tagliapietra
> >> > > > >
> >> > > > > On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
> >> br...@confluent.io>
> >> > > > wrote:
> >> > > > >
> >> > > > >> Hi Alessandro,
> >> > > > >>
> >> > > > >> the `TransformSupplier` is internally wrapped with a
> >> > > > `ProcessorSupplier`,
> >> > > > >> so the statement
> >> > > > >>
> >> > > > >> `transform` is essentially equivalent to adding the Transformer
> >> via
> >> > > > >> Topology#addProcessor() to your processor topology
> >> > > > >>
> >> > > > >> is correct.
> >> > > > >>
> >> > > > >> If you do not change the key, you should definitely use one of
> >> the
> >> > > > >> overloads of `transformValues` to avoid internal data
> >> > redistribution.
> >> > > In
> >> > > > >> your case the overload with `ValueTransformerWithKeySupplier`
> as
> >> > > > suggested
> >> > > > >> by Matthias would fit.
> >> > > > >>
> >> > > > >> Best,
> >> > > > >> Bruno
> >> > > > >>
> >> > > > >> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
> >> > > matth...@confluent.io
> >> > > > >
> >> > > > >> wrote:
> >> > > > >>
> >> > > > >> > There is also `ValueTransformerWithKey` that gives you
> >> read-only
> >> > > acess
> >> > > > >> > to the key.
> >> > > > >> >
> >> > > > >> > -Matthias
> >> > > > >> >
> >> > > > >> > On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> >> > > > >> > > Hi Bruno,
> >> > > > >> > >
> >> > > > >> > > Thank you for the quick answer.
> >> > > > >> > >
> >> > > > >> > > I'm actually trying to do that since it seems there is
> >> really no
> >> > > way
> >> > > > >> to
> >> > > > >> > > have it use `Processor<K, V>`.
> >> > > > >> > > I just wanted (if that would've made any sense) to use the
> >> > > Processor
> >> > > > >> in
> >> > > > >> > > both DSL and non-DSL pipelines.
> >> > > > >> > >
> >> > > > >> > > Anyway, regarding `transformValues()` I don't think I can
> >> use it
> >> > > as
> >> > > > I
> >> > > > >> > need
> >> > > > >> > > the message key since that is the discriminating value for
> >> the
> >> > > > filter
> >> > > > >> (I
> >> > > > >> > > want to exclude old values per sensor ID so per message
> key)
> >> > > > >> > >
> >> > > > >> > > Right now I've this
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> >> > > > >> > > and
> >> > > > >> > > i'm using it with `transform()` .
> >> > > > >> > >
> >> > > > >> > > One thing I've found confusing is this
> >> > > > >> > >
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> >> > > > >> > >
> >> > > > >> > > transform is essentially equivalent to adding the
> Transformer
> >> > via
> >> > > > >> > >> Topology#addProcessor() to yourprocessor topology
> >> > > > >> > >> <
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
> >> > > > >> > >
> >> > > > >> > >> .
> >> > > > >> > >
> >> > > > >> > >
> >> > > > >> > > is it? Doesn't `transform` need a TransformSupplier while
> >> > > > >> `addProcessor`
> >> > > > >> > > uses a ProcessorSupplier?
> >> > > > >> > >
> >> > > > >> > > Thank you again for your help
> >> > > > >> > >
> >> > > > >> > > --
> >> > > > >> > > Alessandro Tagliapietra
> >> > > > >> > >
> >> > > > >> > >
> >> > > > >> > > On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
> >> > br...@confluent.io
> >> > > >
> >> > > > >> > wrote:
> >> > > > >> > >
> >> > > > >> > >> Hi Alessandro,
> >> > > > >> > >>
> >> > > > >> > >> Have you considered using `transform()` (actually in your
> >> case
> >> > > you
> >> > > > >> > should
> >> > > > >> > >> use `transformValues()`) instead of `.process()`?
> >> `transform()`
> >> > > and
> >> > > > >> > >> `transformValues()` are stateful operations similar to
> >> > `.process`
> >> > > > but
> >> > > > >> > they
> >> > > > >> > >> return a `KStream`. On a `KStream` you can then apply a
> >> > windowed
> >> > > > >> > >> aggregation.
> >> > > > >> > >>
> >> > > > >> > >> Hope that helps.
> >> > > > >> > >>
> >> > > > >> > >> Best,
> >> > > > >> > >> Bruno
> >> > > > >> > >>
> >> > > > >> > >>
> >> > > > >> > >> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
> >> > > > >> > >> tagliapietra.alessan...@gmail.com> wrote:
> >> > > > >> > >>
> >> > > > >> > >>> Hi there,
> >> > > > >> > >>>
> >> > > > >> > >>> I'm just starting with Kafka and I'm trying to create a
> >> stream
> >> > > > >> > processor
> >> > > > >> > >>> that in multiple stages:
> >> > > > >> > >>>  - filters messages using a kv store so that only
> messages
> >> > with
> >> > > > >> higher
> >> > > > >> > >>> timestamp gets processed
> >> > > > >> > >>>  - aggregates the message metrics by minute giving e.g.
> the
> >> > avg
> >> > > of
> >> > > > >> > those
> >> > > > >> > >>> metrics in that minute
> >> > > > >> > >>>
> >> > > > >> > >>> The message is simple, the key is the sensor ID and the
> >> value
> >> > is
> >> > > > >> e.g. {
> >> > > > >> > >>> timestamp: UNIX_TIMESTAMP, speed: INT }.
> >> > > > >> > >>>
> >> > > > >> > >>> I've started by creating a processor to use the kv store
> >> and
> >> > > > filter
> >> > > > >> old
> >> > > > >> > >>> messages:
> >> > > > >> > >>>
> >> > > > >> > >>>
> >> > > > >> > >>>
> >> > > > >> > >>
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
> >> > > > >> > >>>
> >> > > > >> > >>> Then I was trying to implement windowing, I saw very nice
> >> > > > windowing
> >> > > > >> > >>> examples for the DSL but none for the Processor API
> (only a
> >> > > small
> >> > > > >> > >> reference
> >> > > > >> > >>> to the windowed store), can someone point me in the right
> >> > > > direction?
> >> > > > >> > >>>
> >> > > > >> > >>> Now, since I wasn't able to find any example I tried to
> use
> >> > the
> >> > > > DSL
> >> > > > >> but
> >> > > > >> > >>> haven't found a way to use my processor with it, I saw
> this
> >> > > > >> > >>>
> >> > > > >> > >>>
> >> > > > >> > >>
> >> > > > >> >
> >> > > > >>
> >> > > >
> >> > >
> >> >
> >>
> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
> >> > > > >> > >>> but
> >> > > > >> > >>> it explains mostly transformers not processors. I also
> saw
> >> > after
> >> > > > >> that
> >> > > > >> > the
> >> > > > >> > >>> example usage of the processor but `.process(...)`
> returns
> >> > void,
> >> > > > so
> >> > > > >> I
> >> > > > >> > >>> cannot have a KStream from a processor?
> >> > > > >> > >>>
> >> > > > >> > >>> Thank you all in advance
> >> > > > >> > >>>
> >> > > > >> > >>> --
> >> > > > >> > >>> Alessandro Tagliapietra
> >> > > > >> > >>>
> >> > > > >> > >>
> >> > > > >> > >
> >> > > > >> >
> >> > > > >> >
> >> > > > >>
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
>

Reply via email to