[ 
https://issues.apache.org/jira/browse/KAFKA-7079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hashan Gayasri Udugahapattuwa updated KAFKA-7079:
-------------------------------------------------
    Description: 
ValueTransformer#transform does not pass the key

KStream#transformValues(ValueTransformerWithKeySupplier ..... method is not 
documented. It might lead to people to use workarounds or fall back to using 
Transformer. This is very likely if the user is using a wrapper API (i.e: for 
Scala) as the user would be checking the documentation more than the available 
API functions in code.

 

 

 

 
----
*Original issue (as it might be useful as a business requirement)*

ValueTransformers' transform method doesn't pass the key to user-code. 
Reporting this as a bug since it currently requires workarounds.

Context:

I'm currently in the process of converting two stateful "*aggregate*" DSL 
operations to the Processor API since the state of those operations are 
relatively large and takes 99% + of CPU time (when profiled) for serializing 
and deserializing them via Kryo. 

Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when 
using the in-memory state store, it seems like the only way to reduce the 
serialization/deserialization overhead is to convert heavy aggregates to 
*transform*s.

In my case, *ValueTransformer* seems to be the option. However, since 
ValueTransformers' _transform_ method only exposes the _value_, I'd either have 
to pre-process and add the key to the value or use *Transformer* instead (which 
is not my intent).

 

As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, it 
seems like a good idea to pass the key to the transform method as well, esp 
since in a stateful transformation, generally the state store has to be queried 
by the key.

  was:
ValueTransformers' transform method doesn't pass the key to user-code. 
Reporting this as a bug since it currently requires workarounds.

 

Context:

I'm currently in the process of converting two stateful "*aggregate*" DSL 
operations to the Processor API since the state of those operations are 
relatively large and takes 99% + of CPU time (when profiled) for serializing 
and deserializing them via Kryo. 

Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when 
using the in-memory state store, it seems like the only way to reduce the 
serialization/deserialization overhead is to convert heavy aggregates to 
*transform*s.

In my case, *ValueTransformer* seems to be the option. However, since 
ValueTransformers' _transform_ method only exposes the _value_, I'd either have 
to pre-process and add the key to the value or use *Transformer* instead (which 
is not my intent).

 

As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, it 
seems like a good idea to pass the key to the transform method as well, esp 
since in a stateful transformation, generally the state store has to be queried 
by the key.

        Summary: ValueTransformerWithKeySupplier is not mentioned in the 
documentation  (was: ValueTransformer#transform does not pass the key)

> ValueTransformerWithKeySupplier is not mentioned in the documentation
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-7079
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7079
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>         Environment: Fedora 27
>            Reporter: Hashan Gayasri Udugahapattuwa
>            Priority: Major
>
> ValueTransformer#transform does not pass the key
> KStream#transformValues(ValueTransformerWithKeySupplier ..... method is not 
> documented. It might lead to people to use workarounds or fall back to using 
> Transformer. This is very likely if the user is using a wrapper API (i.e: for 
> Scala) as the user would be checking the documentation more than the 
> available API functions in code.
>  
>  
>  
>  
> ----
> *Original issue (as it might be useful as a business requirement)*
> ValueTransformers' transform method doesn't pass the key to user-code. 
> Reporting this as a bug since it currently requires workarounds.
> Context:
> I'm currently in the process of converting two stateful "*aggregate*" DSL 
> operations to the Processor API since the state of those operations are 
> relatively large and takes 99% + of CPU time (when profiled) for serializing 
> and deserializing them via Kryo. 
> Since DSL aggregations use state stores of [Bytes, Array[Byte]]] even when 
> using the in-memory state store, it seems like the only way to reduce the 
> serialization/deserialization overhead is to convert heavy aggregates to 
> *transform*s.
> In my case, *ValueTransformer* seems to be the option. However, since 
> ValueTransformers' _transform_ method only exposes the _value_, I'd either 
> have to pre-process and add the key to the value or use *Transformer* instead 
> (which is not my intent).
>  
> As internal _*InternalValueTransformerWithKey*_ already has the readOnlyKey, 
> it seems like a good idea to pass the key to the transform method as well, 
> esp since in a stateful transformation, generally the state store has to be 
> queried by the key.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to