Just a side note. There is currently work in progress on https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the configuration problem for Serdes.
-Matthias On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote: > Hi Bruno, > thanks a lot for checking the code, regarding the SpecificAvroSerde I've > found that using > > final Serde<InputList> valueSpecificAvroSerde = new SpecificAvroSerde<>(); > final Map<String, String> serdeConfig = > Collections.singletonMap("schema.registry.url", "http://localhost:8081"); > valueSpecificAvroSerde.configure(serdeConfig, false); > > and then in aggregate() > > Materialized.with(Serdes.String(), valueSpecificAvroSerde) > > fixed the issue. > > Thanks in advance for the windowing help, very appreciated. > In the meantime I'll try to make some progress on the rest. > > Have a great weekend > > -- > Alessandro Tagliapietra > > > On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io> wrote: > >> Hi Alessandro, >> >> I had a look at your code. Regarding your question whether you use the >> SpecificAvroSerde correctly, take a look at the following documentation: >> >> https://docs.confluent.io/current/streams/developer-guide/datatypes.html >> >> I haven't had the time yet to take a closer look at your problems with the >> aggregation. I will have a look next week. >> >> Have a nice weekend, >> Bruno >> >> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra < >> tagliapietra.alessan...@gmail.com> wrote: >> >>> So I've started with a new app with the archetype:generate as in >>> https://kafka.apache.org/22/documentation/streams/tutorial >>> >>> I've pushed a sample repo here: https://github.com/alex88/kafka-test >>> The avro schemas are a Metric with 2 fields: timestamp and production >> and a >>> MetricList with a list of records (Metric) to be able to manually do the >>> aggregation. >>> Right now the aggregation is simple just for the purpose of the sample >> repo >>> and to easily see if we're getting wrong values. >>> >>> What I wanted to achieve is: >>> - have a custom generator that generates 1 message per second with >>> production = 1 with 1 ore more separate message keys which in my case are >>> the sensor IDs generating the data >>> - a filter that removes out of order messages by having a state that >>> stores key (sensorID) -> last timestamp >>> - a window operation that for this example just sums the values in each >> 10 >>> seconds windows >>> >>> To show where I'm having issues I've setup multiple branches for the >> repo: >>> - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is >> the >>> one I had initially "Failed to flush state store >>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using >>> >>> >> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store >>> - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is >> the >>> one after I've tried to solve above problem with the materializer (maybe >>> the SpecificAvroSerde is wrong?) >>> - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* after >>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new >>> SpecificAvroSerde<>()))) everything seems to be working, if you let both >>> the producer and stream running, you'll see that the stream receives 10 >>> messages (with the timestamp incrementing 1 second for each message) like >>> this: >>> >>> S1 with filtered metric{"timestamp": 160000, "production": 1} >>> S1 with filtered metric{"timestamp": 161000, "production": 1} >>> S1 with filtered metric{"timestamp": 162000, "production": 1} >>> S1 with filtered metric{"timestamp": 163000, "production": 1} >>> S1 with filtered metric{"timestamp": 164000, "production": 1} >>> S1 with filtered metric{"timestamp": 165000, "production": 1} >>> S1 with filtered metric{"timestamp": 166000, "production": 1} >>> S1 with filtered metric{"timestamp": 167000, "production": 1} >>> S1 with filtered metric{"timestamp": 168000, "production": 1} >>> S1 with filtered metric{"timestamp": 169000, "production": 1} >>> >>> and at the 10 seconds interval something like: >>> >>> S1 with computed metric {"timestamp": 160000, "production": 10} >>> S1 with computed metric {"timestamp": 170000, "production": 10} >>> S1 with computed metric {"timestamp": 180000, "production": 10} >>> >>> and so on... >>> Now there are two problems, after stopping and restarting the stream >>> processor (by sending SIGINT via IntelliJ since I start the class main >> with >>> it) it happens: >>> - sometimes the aggregated count is wrong, if I have it start windowing >>> for 7 seconds (e.g. seconds 11-17), restart the stream, after restart it >>> might just emit a value for the new 3 missing seconds (seconds 18-20) and >>> the aggregated value is 3 not 10 >>> - sometimes the window outputs twice, in the example where I restart the >>> stream processor I might get as output >>> >>> S1 with filtered metric{"timestamp": 154000, "production": 1} >>> S1 with computed metric {"timestamp": 160000, "production": 5} >>> S1 with filtered metric{"timestamp": 155000, "production": 1} >>> S1 with filtered metric{"timestamp": 156000, "production": 1} >>> S1 with filtered metric{"timestamp": 157000, "production": 1} >>> S1 with filtered metric{"timestamp": 158000, "production": 1} >>> S1 with filtered metric{"timestamp": 159000, "production": 1} >>> S1 with filtered metric{"timestamp": 160000, "production": 1} >>> S1 with filtered metric{"timestamp": 161000, "production": 1} >>> S1 with computed metric {"timestamp": 160000, "production": 10} >>> S1 with filtered metric{"timestamp": 162000, "production": 1} >>> >>> as you can see, window for timestamp 160000 is duplicated >>> >>> Is this because the window state isn't persisted across restarts? >>> My ultimate goal is to have the window part emit only once and resume >>> processing across restarts, while avoiding processing out of order data >>> (that's the purpose of the TimestampIncrementalFilter) >>> >>> Thank you in advance >>> >>> -- >>> Alessandro Tagliapietra >>> >>> >>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra < >>> tagliapietra.alessan...@gmail.com> wrote: >>> >>>> Hi Bruno, >>>> >>>> I'm using the confluent docker images 5.2.1, so kafka 2.2. >>>> Anyway I'll try to make a small reproduction repo with all the >> different >>>> cases soon. >>>> >>>> Thank you >>>> >>>> -- >>>> Alessandro Tagliapietra >>>> >>>> >>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io> >>> wrote: >>>> >>>>> Hi Alessandro, >>>>> >>>>> What version of Kafka do you use? >>>>> >>>>> Could you please give a more detailed example for the issues with the >>> two >>>>> keys you see? >>>>> >>>>> Could the following bug be related to the duplicates you see? >>>>> >>>>> >>>>> >>> >> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22 >>>>> >>>>> How do you restart the processor? >>>>> >>>>> Best, >>>>> Bruno >>>>> >>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra < >>>>> tagliapietra.alessan...@gmail.com> wrote: >>>>> >>>>>> Thank you Bruno, >>>>>> >>>>>> I'll look into those, however average is just a simple thing I'm >>> trying >>>>>> right now just to get an initial windowing flow working. >>>>>> In the future I'll probably still need the actual values for other >>>>>> calculations. We won't have more than 60 elements per window for >> sure. >>>>>> >>>>>> So far to not manually serialize/deserialize the array list I've >>>>> created an >>>>>> Avro model with an array field containing the values. >>>>>> I had issues with suppress as explained here >>>>>> >>>>>> >>>>>> >>>>> >>> >> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198 >>>>>> >>>>>> but I got that working. >>>>>> So far everything seems to be working, except a couple things: >>>>>> - if I generate data with 1 key, I correctly get a value each 10 >>>>> seconds, >>>>>> if I later start generating data with another key (while key 1 is >>> still >>>>>> generating) the windowing emits a value only after the timestamp of >>> key >>>>> 2 >>>>>> reaches the last generated window >>>>>> - while generating data, if I restart the processor as soon as it >>>>> starts >>>>>> it sometimes generates 2 aggregates for the same window even if I'm >>>>> using >>>>>> the suppress >>>>>> >>>>>> Anyway, I'll look into your link and try to find out the cause of >>> these >>>>>> issues, probably starting from scratch with a simpler example >>>>>> >>>>>> Thank you for your help! >>>>>> >>>>>> -- >>>>>> Alessandro Tagliapietra >>>>>> >>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io> >>>>> wrote: >>>>>> >>>>>>> Hi Alessandro, >>>>>>> >>>>>>> Have a look at this Kafka Usage Pattern for computing averages >>> without >>>>>>> using an ArrayList. >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average >>>>>>> ? >>>>>>> >>>>>>> The advantages of this pattern over the ArrayList approach is the >>>>> reduced >>>>>>> space needed to compute the aggregate. Note that you will still >> need >>>>> to >>>>>>> implement a SerDe. However, the SerDe should be a bit easier to >>>>> implement >>>>>>> than a SerDe for an ArrayList. >>>>>>> >>>>>>> Hope that helps. >>>>>>> >>>>>>> Best, >>>>>>> Bruno >>>>>>> >>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra < >>>>>>> tagliapietra.alessan...@gmail.com> wrote: >>>>>>> >>>>>>>> Sorry but it seemed harder than I thought, >>>>>>>> >>>>>>>> to have the custom aggregation working I need to get an >> ArrayList >>> of >>>>>> all >>>>>>>> the values in the window, so far my aggregate DSL method creates >>> an >>>>>>>> ArrayList on the initializer and adds each value to the list in >>> the >>>>>>>> aggregator. >>>>>>>> Then I think I'll have to provide a serder to change the output >>>>> type of >>>>>>>> that method. >>>>>>>> I was looking at >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api >>>>>>>> but >>>>>>>> that seems more towards a list of longs and already uses >>> longSerde. >>>>>>>> I'm currently trying to implement another avro model that has a >>>>> field >>>>>> of >>>>>>>> type array so I can use the regular avro serializer to implement >>>>> this. >>>>>>>> Should I create my own serdes instead or is this the right way? >>>>>>>> >>>>>>>> Thank you in advance >>>>>>>> >>>>>>>> -- >>>>>>>> Alessandro Tagliapietra >>>>>>>> >>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra < >>>>>>>> tagliapietra.alessan...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thank you Bruno and Matthias, >>>>>>>>> >>>>>>>>> I've modified the transformer to implement the >>>>>> ValueTransformerWithKey >>>>>>>>> interface and everything is working fine. >>>>>>>>> I've now to window the data and manually aggregate each window >>>>> data >>>>>>> since >>>>>>>>> I've to do some averages and sum of differences. >>>>>>>>> So far I've just having some issues with message types since >> I'm >>>>>>> changing >>>>>>>>> the data type when aggregating the window but I think it's an >>> easy >>>>>>>> problem. >>>>>>>>> >>>>>>>>> Thank you again >>>>>>>>> Best >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Alessandro Tagliapietra >>>>>>>>> >>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna < >>>>> br...@confluent.io> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Alessandro, >>>>>>>>>> >>>>>>>>>> the `TransformSupplier` is internally wrapped with a >>>>>>>> `ProcessorSupplier`, >>>>>>>>>> so the statement >>>>>>>>>> >>>>>>>>>> `transform` is essentially equivalent to adding the >> Transformer >>>>> via >>>>>>>>>> Topology#addProcessor() to your processor topology >>>>>>>>>> >>>>>>>>>> is correct. >>>>>>>>>> >>>>>>>>>> If you do not change the key, you should definitely use one >> of >>>>> the >>>>>>>>>> overloads of `transformValues` to avoid internal data >>>>>> redistribution. >>>>>>> In >>>>>>>>>> your case the overload with `ValueTransformerWithKeySupplier` >>> as >>>>>>>> suggested >>>>>>>>>> by Matthias would fit. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Bruno >>>>>>>>>> >>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax < >>>>>>> matth...@confluent.io >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you >>>>> read-only >>>>>>> acess >>>>>>>>>>> to the key. >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote: >>>>>>>>>>>> Hi Bruno, >>>>>>>>>>>> >>>>>>>>>>>> Thank you for the quick answer. >>>>>>>>>>>> >>>>>>>>>>>> I'm actually trying to do that since it seems there is >>>>> really no >>>>>>> way >>>>>>>>>> to >>>>>>>>>>>> have it use `Processor<K, V>`. >>>>>>>>>>>> I just wanted (if that would've made any sense) to use >> the >>>>>>> Processor >>>>>>>>>> in >>>>>>>>>>>> both DSL and non-DSL pipelines. >>>>>>>>>>>> >>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can >>>>> use it >>>>>>> as >>>>>>>> I >>>>>>>>>>> need >>>>>>>>>>>> the message key since that is the discriminating value >> for >>>>> the >>>>>>>> filter >>>>>>>>>> (I >>>>>>>>>>>> want to exclude old values per sensor ID so per message >>> key) >>>>>>>>>>>> >>>>>>>>>>>> Right now I've this >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java >>>>>>>>>>>> and >>>>>>>>>>>> i'm using it with `transform()` . >>>>>>>>>>>> >>>>>>>>>>>> One thing I've found confusing is this >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process >>>>>>>>>>>> >>>>>>>>>>>> transform is essentially equivalent to adding the >>> Transformer >>>>>> via >>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology >>>>>>>>>>>>> < >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology >>>>>>>>>>>> >>>>>>>>>>>>> . >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while >>>>>>>>>> `addProcessor` >>>>>>>>>>>> uses a ProcessorSupplier? >>>>>>>>>>>> >>>>>>>>>>>> Thank you again for your help >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Alessandro Tagliapietra >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna < >>>>>> br...@confluent.io >>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Alessandro, >>>>>>>>>>>>> >>>>>>>>>>>>> Have you considered using `transform()` (actually in >> your >>>>> case >>>>>>> you >>>>>>>>>>> should >>>>>>>>>>>>> use `transformValues()`) instead of `.process()`? >>>>> `transform()` >>>>>>> and >>>>>>>>>>>>> `transformValues()` are stateful operations similar to >>>>>> `.process` >>>>>>>> but >>>>>>>>>>> they >>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a >>>>>> windowed >>>>>>>>>>>>> aggregation. >>>>>>>>>>>>> >>>>>>>>>>>>> Hope that helps. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Bruno >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra >> < >>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi there, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a >>>>> stream >>>>>>>>>>> processor >>>>>>>>>>>>>> that in multiple stages: >>>>>>>>>>>>>> - filters messages using a kv store so that only >>> messages >>>>>> with >>>>>>>>>> higher >>>>>>>>>>>>>> timestamp gets processed >>>>>>>>>>>>>> - aggregates the message metrics by minute giving e.g. >>> the >>>>>> avg >>>>>>> of >>>>>>>>>>> those >>>>>>>>>>>>>> metrics in that minute >>>>>>>>>>>>>> >>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the >>>>> value >>>>>> is >>>>>>>>>> e.g. { >>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I've started by creating a processor to use the kv >> store >>>>> and >>>>>>>> filter >>>>>>>>>> old >>>>>>>>>>>>>> messages: >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java >>>>>>>>>>>>>> >>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very >> nice >>>>>>>> windowing >>>>>>>>>>>>>> examples for the DSL but none for the Processor API >>> (only a >>>>>>> small >>>>>>>>>>>>> reference >>>>>>>>>>>>>> to the windowed store), can someone point me in the >> right >>>>>>>> direction? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to >>> use >>>>>> the >>>>>>>> DSL >>>>>>>>>> but >>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw >>> this >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>> >> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration >>>>>>>>>>>>>> but >>>>>>>>>>>>>> it explains mostly transformers not processors. I also >>> saw >>>>>> after >>>>>>>>>> that >>>>>>>>>>> the >>>>>>>>>>>>>> example usage of the processor but `.process(...)` >>> returns >>>>>> void, >>>>>>>> so >>>>>>>>>> I >>>>>>>>>>>>>> cannot have a KStream from a processor? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thank you all in advance >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Alessandro Tagliapietra >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >
signature.asc
Description: OpenPGP digital signature