Just a side note. There is currently work in progress on
https://issues.apache.org/jira/browse/KAFKA-3729 that should fix the
configuration problem for Serdes.

-Matthias

On 4/19/19 9:12 PM, Alessandro Tagliapietra wrote:
> Hi Bruno,
> thanks a lot for checking the code, regarding the SpecificAvroSerde I've
> found that using
> 
> final Serde<InputList> valueSpecificAvroSerde = new SpecificAvroSerde<>();
> final Map<String, String> serdeConfig =
> Collections.singletonMap("schema.registry.url", "http://localhost:8081";);
> valueSpecificAvroSerde.configure(serdeConfig, false);
> 
> and then in aggregate()
> 
> Materialized.with(Serdes.String(), valueSpecificAvroSerde)
> 
> fixed the issue.
> 
> Thanks in advance for the windowing help, very appreciated.
> In the meantime I'll try to make some progress on the rest.
> 
> Have a great weekend
> 
> --
> Alessandro Tagliapietra
> 
> 
> On Fri, Apr 19, 2019 at 2:09 PM Bruno Cadonna <br...@confluent.io> wrote:
> 
>> Hi Alessandro,
>>
>> I had a look at your code. Regarding your question whether you use the
>> SpecificAvroSerde correctly, take a look at the following documentation:
>>
>> https://docs.confluent.io/current/streams/developer-guide/datatypes.html
>>
>> I haven't had the time yet to take a closer look at your problems with the
>> aggregation. I will have a look next week.
>>
>> Have a nice weekend,
>> Bruno
>>
>> On Wed, Apr 17, 2019 at 4:43 PM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>>> So I've started with a new app with the archetype:generate as in
>>> https://kafka.apache.org/22/documentation/streams/tutorial
>>>
>>> I've pushed a sample repo here: https://github.com/alex88/kafka-test
>>> The avro schemas are a Metric with 2 fields: timestamp and production
>> and a
>>> MetricList with a list of records (Metric) to be able to manually do the
>>> aggregation.
>>> Right now the aggregation is simple just for the purpose of the sample
>> repo
>>> and to easily see if we're getting wrong values.
>>>
>>> What I wanted to achieve is:
>>>  - have a custom generator that generates 1 message per second with
>>> production = 1 with 1 ore more separate message keys which in my case are
>>> the sensor IDs generating the data
>>>  - a filter that removes out of order messages by having a state that
>>> stores key (sensorID) -> last timestamp
>>>  - a window operation that for this example just sums the values in each
>> 10
>>> seconds windows
>>>
>>> To show where I'm having issues I've setup multiple branches for the
>> repo:
>>>  - *issue-01 <https://github.com/alex88/kafka-test/tree/issue-01>* is
>> the
>>> one I had initially "Failed to flush state store
>>> KSTREAM-AGGREGATE-STATE-STORE-0000000003" that I tried to solve using
>>>
>>>
>> https://stackoverflow.com/questions/55186727/kafka-streams-2-1-1-class-cast-while-flushing-timed-aggregation-to-store
>>>  - *issue-02 <https://github.com/alex88/kafka-test/tree/issue-02>* is
>> the
>>> one after I've tried to solve above problem with the materializer (maybe
>>> the SpecificAvroSerde is wrong?)
>>>  - *issue-03 <https://github.com/alex88/kafka-test/tree/issue-03>* after
>>> fixing issue-02 (by using groupByKey(Grouped.with(Serdes.String(), new
>>> SpecificAvroSerde<>()))) everything seems to be working, if you let both
>>> the producer and stream running, you'll see that the stream receives 10
>>> messages (with the timestamp incrementing 1 second for each message) like
>>> this:
>>>
>>> S1 with filtered metric{"timestamp": 160000, "production": 1}
>>> S1 with filtered metric{"timestamp": 161000, "production": 1}
>>> S1 with filtered metric{"timestamp": 162000, "production": 1}
>>> S1 with filtered metric{"timestamp": 163000, "production": 1}
>>> S1 with filtered metric{"timestamp": 164000, "production": 1}
>>> S1 with filtered metric{"timestamp": 165000, "production": 1}
>>> S1 with filtered metric{"timestamp": 166000, "production": 1}
>>> S1 with filtered metric{"timestamp": 167000, "production": 1}
>>> S1 with filtered metric{"timestamp": 168000, "production": 1}
>>> S1 with filtered metric{"timestamp": 169000, "production": 1}
>>>
>>> and at the 10 seconds interval something like:
>>>
>>> S1 with computed metric {"timestamp": 160000, "production": 10}
>>> S1 with computed metric {"timestamp": 170000, "production": 10}
>>> S1 with computed metric {"timestamp": 180000, "production": 10}
>>>
>>> and so on...
>>> Now there are two problems, after stopping and restarting the stream
>>> processor (by sending SIGINT via IntelliJ since I start the class main
>> with
>>> it) it happens:
>>>  - sometimes the aggregated count is wrong, if I have it start windowing
>>> for 7 seconds (e.g. seconds 11-17), restart the stream, after restart it
>>> might just emit a value for the new 3 missing seconds (seconds 18-20) and
>>> the aggregated value is 3 not 10
>>>  - sometimes the window outputs twice, in the example where I restart the
>>> stream processor I might get as output
>>>
>>> S1 with filtered metric{"timestamp": 154000, "production": 1}
>>> S1 with computed metric {"timestamp": 160000, "production": 5}
>>> S1 with filtered metric{"timestamp": 155000, "production": 1}
>>> S1 with filtered metric{"timestamp": 156000, "production": 1}
>>> S1 with filtered metric{"timestamp": 157000, "production": 1}
>>> S1 with filtered metric{"timestamp": 158000, "production": 1}
>>> S1 with filtered metric{"timestamp": 159000, "production": 1}
>>> S1 with filtered metric{"timestamp": 160000, "production": 1}
>>> S1 with filtered metric{"timestamp": 161000, "production": 1}
>>> S1 with computed metric {"timestamp": 160000, "production": 10}
>>> S1 with filtered metric{"timestamp": 162000, "production": 1}
>>>
>>> as you can see, window for timestamp 160000 is duplicated
>>>
>>> Is this because the window state isn't persisted across restarts?
>>> My ultimate goal is to have the window part emit only once and resume
>>> processing across restarts, while avoiding processing out of order data
>>> (that's the purpose of the TimestampIncrementalFilter)
>>>
>>> Thank you in advance
>>>
>>> --
>>> Alessandro Tagliapietra
>>>
>>>
>>> On Tue, Apr 16, 2019 at 9:48 PM Alessandro Tagliapietra <
>>> tagliapietra.alessan...@gmail.com> wrote:
>>>
>>>> Hi Bruno,
>>>>
>>>> I'm using the confluent docker images 5.2.1, so kafka 2.2.
>>>> Anyway I'll try to make a small reproduction repo with all the
>> different
>>>> cases soon.
>>>>
>>>> Thank you
>>>>
>>>> --
>>>> Alessandro Tagliapietra
>>>>
>>>>
>>>> On Tue, Apr 16, 2019 at 1:02 PM Bruno Cadonna <br...@confluent.io>
>>> wrote:
>>>>
>>>>> Hi Alessandro,
>>>>>
>>>>> What version of Kafka do you use?
>>>>>
>>>>> Could you please give a more detailed example for the issues with the
>>> two
>>>>> keys you see?
>>>>>
>>>>> Could the following bug be related to the duplicates you see?
>>>>>
>>>>>
>>>>>
>>>
>> https://issues.apache.org/jira/browse/KAFKA-7895?jql=project%20%3D%20KAFKA%20AND%20issuetype%20%3D%20Bug%20AND%20status%20in%20(Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20component%20%3D%20streams%20AND%20text%20~%20%22duplicate%22
>>>>>
>>>>> How do you restart the processor?
>>>>>
>>>>> Best,
>>>>> Bruno
>>>>>
>>>>> On Mon, Apr 15, 2019 at 11:02 PM Alessandro Tagliapietra <
>>>>> tagliapietra.alessan...@gmail.com> wrote:
>>>>>
>>>>>> Thank you Bruno,
>>>>>>
>>>>>> I'll look into those, however average is just a simple thing I'm
>>> trying
>>>>>> right now just to get an initial windowing flow working.
>>>>>> In the future I'll probably still need the actual values for other
>>>>>> calculations. We won't have more than 60 elements per window for
>> sure.
>>>>>>
>>>>>> So far to not manually serialize/deserialize the array list I've
>>>>> created an
>>>>>> Avro model with an array field containing the values.
>>>>>> I had issues with suppress as explained here
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>
>> https://stackoverflow.com/questions/55699096/kafka-aggregate-with-materialized-with-specific-avro-serve-gives-nullpointerexce/55699198#55699198
>>>>>>
>>>>>> but I got that working.
>>>>>> So far everything seems to be working, except a couple things:
>>>>>>  - if I generate data with 1 key, I correctly get a value each 10
>>>>> seconds,
>>>>>> if I later start generating data with another key (while key 1 is
>>> still
>>>>>> generating) the windowing emits a value only after the timestamp of
>>> key
>>>>> 2
>>>>>> reaches the last generated window
>>>>>>  - while generating data, if I restart the processor as soon as it
>>>>> starts
>>>>>> it sometimes generates 2 aggregates for the same window even if I'm
>>>>> using
>>>>>> the suppress
>>>>>>
>>>>>> Anyway, I'll look into your link and try to find out the cause of
>>> these
>>>>>> issues, probably starting from scratch with a simpler example
>>>>>>
>>>>>> Thank you for your help!
>>>>>>
>>>>>> --
>>>>>> Alessandro Tagliapietra
>>>>>>
>>>>>> On Mon, Apr 15, 2019 at 10:08 PM Bruno Cadonna <br...@confluent.io>
>>>>> wrote:
>>>>>>
>>>>>>> Hi Alessandro,
>>>>>>>
>>>>>>> Have a look at this Kafka Usage Pattern for computing averages
>>> without
>>>>>>> using an ArrayList.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average
>>>>>>> ?
>>>>>>>
>>>>>>> The advantages of this pattern over the ArrayList approach is the
>>>>> reduced
>>>>>>> space needed to compute the aggregate. Note that you will still
>> need
>>>>> to
>>>>>>> implement a SerDe. However, the SerDe should be a bit easier to
>>>>> implement
>>>>>>> than a SerDe for an ArrayList.
>>>>>>>
>>>>>>> Hope that helps.
>>>>>>>
>>>>>>> Best,
>>>>>>> Bruno
>>>>>>>
>>>>>>> On Mon, Apr 15, 2019 at 4:57 PM Alessandro Tagliapietra <
>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Sorry but it seemed harder than I thought,
>>>>>>>>
>>>>>>>> to have the custom aggregation working I need to get an
>> ArrayList
>>> of
>>>>>> all
>>>>>>>> the values in the window, so far my aggregate DSL method creates
>>> an
>>>>>>>> ArrayList on the initializer and adds each value to the list in
>>> the
>>>>>>>> aggregator.
>>>>>>>> Then I think I'll have to provide a serder to change the output
>>>>> type of
>>>>>>>> that method.
>>>>>>>> I was looking at
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api
>>>>>>>> but
>>>>>>>> that seems more towards a list of longs and already uses
>>> longSerde.
>>>>>>>> I'm currently trying to implement another avro model that has a
>>>>> field
>>>>>> of
>>>>>>>> type array so I can use the regular avro serializer to implement
>>>>> this.
>>>>>>>> Should I create my own serdes instead or is this the right way?
>>>>>>>>
>>>>>>>> Thank you in advance
>>>>>>>>
>>>>>>>> --
>>>>>>>> Alessandro Tagliapietra
>>>>>>>>
>>>>>>>> On Mon, Apr 15, 2019 at 3:42 PM Alessandro Tagliapietra <
>>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thank you Bruno and Matthias,
>>>>>>>>>
>>>>>>>>> I've modified the transformer to implement the
>>>>>> ValueTransformerWithKey
>>>>>>>>> interface and everything is working fine.
>>>>>>>>> I've now to window the data and manually aggregate each window
>>>>> data
>>>>>>> since
>>>>>>>>> I've to do some averages and sum of differences.
>>>>>>>>> So far I've just having some issues with message types since
>> I'm
>>>>>>> changing
>>>>>>>>> the data type when aggregating the window but I think it's an
>>> easy
>>>>>>>> problem.
>>>>>>>>>
>>>>>>>>> Thank you again
>>>>>>>>> Best
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Alessandro Tagliapietra
>>>>>>>>>
>>>>>>>>> On Sun, Apr 14, 2019 at 11:26 AM Bruno Cadonna <
>>>>> br...@confluent.io>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Alessandro,
>>>>>>>>>>
>>>>>>>>>> the `TransformSupplier` is internally wrapped with a
>>>>>>>> `ProcessorSupplier`,
>>>>>>>>>> so the statement
>>>>>>>>>>
>>>>>>>>>> `transform` is essentially equivalent to adding the
>> Transformer
>>>>> via
>>>>>>>>>> Topology#addProcessor() to your processor topology
>>>>>>>>>>
>>>>>>>>>> is correct.
>>>>>>>>>>
>>>>>>>>>> If you do not change the key, you should definitely use one
>> of
>>>>> the
>>>>>>>>>> overloads of `transformValues` to avoid internal data
>>>>>> redistribution.
>>>>>>> In
>>>>>>>>>> your case the overload with `ValueTransformerWithKeySupplier`
>>> as
>>>>>>>> suggested
>>>>>>>>>> by Matthias would fit.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Bruno
>>>>>>>>>>
>>>>>>>>>> On Sat, Apr 13, 2019 at 12:51 PM Matthias J. Sax <
>>>>>>> matth...@confluent.io
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> There is also `ValueTransformerWithKey` that gives you
>>>>> read-only
>>>>>>> acess
>>>>>>>>>>> to the key.
>>>>>>>>>>>
>>>>>>>>>>> -Matthias
>>>>>>>>>>>
>>>>>>>>>>> On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
>>>>>>>>>>>> Hi Bruno,
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for the quick answer.
>>>>>>>>>>>>
>>>>>>>>>>>> I'm actually trying to do that since it seems there is
>>>>> really no
>>>>>>> way
>>>>>>>>>> to
>>>>>>>>>>>> have it use `Processor<K, V>`.
>>>>>>>>>>>> I just wanted (if that would've made any sense) to use
>> the
>>>>>>> Processor
>>>>>>>>>> in
>>>>>>>>>>>> both DSL and non-DSL pipelines.
>>>>>>>>>>>>
>>>>>>>>>>>> Anyway, regarding `transformValues()` I don't think I can
>>>>> use it
>>>>>>> as
>>>>>>>> I
>>>>>>>>>>> need
>>>>>>>>>>>> the message key since that is the discriminating value
>> for
>>>>> the
>>>>>>>> filter
>>>>>>>>>> (I
>>>>>>>>>>>> want to exclude old values per sensor ID so per message
>>> key)
>>>>>>>>>>>>
>>>>>>>>>>>> Right now I've this
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
>>>>>>>>>>>> and
>>>>>>>>>>>> i'm using it with `transform()` .
>>>>>>>>>>>>
>>>>>>>>>>>> One thing I've found confusing is this
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
>>>>>>>>>>>>
>>>>>>>>>>>> transform is essentially equivalent to adding the
>>> Transformer
>>>>>> via
>>>>>>>>>>>>> Topology#addProcessor() to yourprocessor topology
>>>>>>>>>>>>> <
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology
>>>>>>>>>>>>
>>>>>>>>>>>>> .
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> is it? Doesn't `transform` need a TransformSupplier while
>>>>>>>>>> `addProcessor`
>>>>>>>>>>>> uses a ProcessorSupplier?
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you again for your help
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Alessandro Tagliapietra
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <
>>>>>> br...@confluent.io
>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Alessandro,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Have you considered using `transform()` (actually in
>> your
>>>>> case
>>>>>>> you
>>>>>>>>>>> should
>>>>>>>>>>>>> use `transformValues()`) instead of `.process()`?
>>>>> `transform()`
>>>>>>> and
>>>>>>>>>>>>> `transformValues()` are stateful operations similar to
>>>>>> `.process`
>>>>>>>> but
>>>>>>>>>>> they
>>>>>>>>>>>>> return a `KStream`. On a `KStream` you can then apply a
>>>>>> windowed
>>>>>>>>>>>>> aggregation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hope that helps.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Bruno
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra
>> <
>>>>>>>>>>>>> tagliapietra.alessan...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi there,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm just starting with Kafka and I'm trying to create a
>>>>> stream
>>>>>>>>>>> processor
>>>>>>>>>>>>>> that in multiple stages:
>>>>>>>>>>>>>>  - filters messages using a kv store so that only
>>> messages
>>>>>> with
>>>>>>>>>> higher
>>>>>>>>>>>>>> timestamp gets processed
>>>>>>>>>>>>>>  - aggregates the message metrics by minute giving e.g.
>>> the
>>>>>> avg
>>>>>>> of
>>>>>>>>>>> those
>>>>>>>>>>>>>> metrics in that minute
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The message is simple, the key is the sensor ID and the
>>>>> value
>>>>>> is
>>>>>>>>>> e.g. {
>>>>>>>>>>>>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've started by creating a processor to use the kv
>> store
>>>>> and
>>>>>>>> filter
>>>>>>>>>> old
>>>>>>>>>>>>>> messages:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Then I was trying to implement windowing, I saw very
>> nice
>>>>>>>> windowing
>>>>>>>>>>>>>> examples for the DSL but none for the Processor API
>>> (only a
>>>>>>> small
>>>>>>>>>>>>> reference
>>>>>>>>>>>>>> to the windowed store), can someone point me in the
>> right
>>>>>>>> direction?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now, since I wasn't able to find any example I tried to
>>> use
>>>>>> the
>>>>>>>> DSL
>>>>>>>>>> but
>>>>>>>>>>>>>> haven't found a way to use my processor with it, I saw
>>> this
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>> it explains mostly transformers not processors. I also
>>> saw
>>>>>> after
>>>>>>>>>> that
>>>>>>>>>>> the
>>>>>>>>>>>>>> example usage of the processor but `.process(...)`
>>> returns
>>>>>> void,
>>>>>>>> so
>>>>>>>>>> I
>>>>>>>>>>>>>> cannot have a KStream from a processor?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thank you all in advance
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Alessandro Tagliapietra
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to