There is also `ValueTransformerWithKey` that gives you read-only acess
to the key.

-Matthias

On 4/12/19 5:34 PM, Alessandro Tagliapietra wrote:
> Hi Bruno,
> 
> Thank you for the quick answer.
> 
> I'm actually trying to do that since it seems there is really no way to
> have it use `Processor<K, V>`.
> I just wanted (if that would've made any sense) to use the Processor in
> both DSL and non-DSL pipelines.
> 
> Anyway, regarding `transformValues()` I don't think I can use it as I need
> the message key since that is the discriminating value for the filter (I
> want to exclude old values per sensor ID so per message key)
> 
> Right now I've this
> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfiltertransformer-java
> and
> i'm using it with `transform()` .
> 
> One thing I've found confusing is this
> https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process
> 
> transform is essentially equivalent to adding the Transformer via
>> Topology#addProcessor() to yourprocessor topology
>> <https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processor-topology>
>> .
> 
> 
> is it? Doesn't `transform` need a TransformSupplier while `addProcessor`
> uses a ProcessorSupplier?
> 
> Thank you again for your help
> 
> --
> Alessandro Tagliapietra
> 
> 
> On Fri, Apr 12, 2019 at 5:04 PM Bruno Cadonna <br...@confluent.io> wrote:
> 
>> Hi Alessandro,
>>
>> Have you considered using `transform()` (actually in your case you should
>> use `transformValues()`) instead of `.process()`? `transform()` and
>> `transformValues()` are stateful operations similar to `.process` but they
>> return a `KStream`. On a `KStream` you can then apply a windowed
>> aggregation.
>>
>> Hope that helps.
>>
>> Best,
>> Bruno
>>
>>
>> On Fri, Apr 12, 2019 at 4:31 PM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>>> Hi there,
>>>
>>> I'm just starting with Kafka and I'm trying to create a stream processor
>>> that in multiple stages:
>>>  - filters messages using a kv store so that only messages with higher
>>> timestamp gets processed
>>>  - aggregates the message metrics by minute giving e.g. the avg of those
>>> metrics in that minute
>>>
>>> The message is simple, the key is the sensor ID and the value is e.g. {
>>> timestamp: UNIX_TIMESTAMP, speed: INT }.
>>>
>>> I've started by creating a processor to use the kv store and filter old
>>> messages:
>>>
>>>
>>>
>> https://gist.github.com/alex88/7d229698546971452c3efc862fb4d3fd#file-timestampfilter-java
>>>
>>> Then I was trying to implement windowing, I saw very nice windowing
>>> examples for the DSL but none for the Processor API (only a small
>> reference
>>> to the windowed store), can someone point me in the right direction?
>>>
>>> Now, since I wasn't able to find any example I tried to use the DSL but
>>> haven't found a way to use my processor with it, I saw this
>>>
>>>
>> https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration
>>> but
>>> it explains mostly transformers not processors. I also saw after that the
>>> example usage of the processor but `.process(...)` returns void, so I
>>> cannot have a KStream from a processor?
>>>
>>> Thank you all in advance
>>>
>>> --
>>> Alessandro Tagliapietra
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to