[
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323718#comment-15323718
]
Jeff Klukas commented on KAFKA-3801:
------------------------------------
Let me give more context for the example. We have an application that produces
JSON messages to a Kafka topic interleaved with occasional checkpoint messages
that are of {{Long}} type.
If I want to create a KStream of just the checkpoint messages, I need to filter
out the JSON messages before deserializing. Here's what it looks like:
{{KStream<Long, Long> checkpointStream = builder.stream(Serdes.Long(),
Serdes.ByteArray(), inputTopicName)}}
{{.filter((key, bytes) -> bytes.length ==
8).mapValues(LongDeserializer::deserialize)}}
I need to use ByteArraySerde when calling {{stream}}, then I do the
deserialization in a {{mapValues}} invocation after filtering out messages of
the wrong type.
Another option would be to materialize the stream to a topic after the filter
and then call {{builder.stream(Serdes.Long(), Serdes.Long(), newTopicName)}},
but I'd like to avoid unnecessary materialization.
So in the current scheme, I need to create an instance of {{LongDeserializer}}
separately so that I can then call its {{deserialize}} method in {{mapValues}}.
This situation probably won't occur frequently, so I understand if it's decided
not to bother considering this change.
> Provide static serialize() and deserialize() for use as method references
> -------------------------------------------------------------------------
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
> Issue Type: Improvement
> Components: clients, streams
> Reporter: Jeff Klukas
> Assignee: Guozhang Wang
> Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}}
> are abstracted away in Kafka Streams through the use of `Serdes` classes,
> there are some instances where developers may want to call them directly. The
> serializers and deserializers for simple types don't require any
> configuration and could be static, but currently it's necessary to create an
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[]
> serialize(? data)}} method and deserialization logic into a {{static public ?
> deserialize(byte[] data)}} method. The existing instance methods would simply
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the
> user needing to create an instance of {{LongDeserializer}}.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)