[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15325454#comment-15325454
 ] 

Guozhang Wang commented on KAFKA-3801:
--

Before we introduced Kafka Streams, we did not consider adding static serdes 
since for produce / consumer, a single serde object is sufficient, and hence it 
is always required anyways; but for Kafka Streams where multiple serdes may be 
needed, creating one for each such case may be cumbersome but I thought that in 
practice, you won't be needing to many different ones (for example, you many 
use JSON / Avro / Thrift for most cases, and only for very few streams you 
would use a different one, like for primitive types) so they may be still okay.

If we observed common usage patterns where many objects need to be created for 
a single application, we can consider whether it makes more sense to do this 
optimization. I'm going to close it for now but feel free to re-open it 
whenever you want to resume this discussion.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Jeff Klukas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15325270#comment-15325270
 ] 

Jeff Klukas commented on KAFKA-3801:


I like concise code that you get from a static method reference: 
{{mapValues(LongDeserializer::deserialize)}} as opposed to {{mapValues(bytes -> 
Serdes.Long().deserializer().deserialize(bytes))}}, but you're correct that 
it's possible to use {{Serdes}} here inline.

I'm fine to close this in deference to a more comprehensive solution down the 
line.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-10 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15325209#comment-15325209
 ] 

Guozhang Wang commented on KAFKA-3801:
--

I think a general question here, that we are considering, is whether we could 
make it more flexible for users to apply some stateless processing operators on 
the raw key-value pair in byte arrays before deserializing (today we ALWAYS 
deserialize first, partially because we need to extract the timestamp), so that 
for example in your case you can {{branch}} the stream to two streams for 
checkpoint / data only. But this may need to be done as a more comprehensive 
story.

As for your case specifically, for now you can still just call 
{{Serdes.Long().serializer.serialize()}} in mapValues.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-09 Thread Jeff Klukas (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323718#comment-15323718
 ] 

Jeff Klukas commented on KAFKA-3801:


Let me give more context for the example. We have an application that produces 
JSON messages to a Kafka topic interleaved with occasional checkpoint messages 
that are of {{Long}} type.

If I want to create a KStream of just the checkpoint messages, I need to filter 
out the JSON messages before deserializing. Here's what it looks like:

{{KStream checkpointStream = builder.stream(Serdes.Long(), 
Serdes.ByteArray(), inputTopicName)}}
{{.filter((key, bytes) -> bytes.length == 
8).mapValues(LongDeserializer::deserialize)}}

I need to use ByteArraySerde when calling {{stream}}, then I do the 
deserialization in a {{mapValues}} invocation after filtering out messages of 
the wrong type.

Another option would be to materialize the stream to a topic after the filter 
and then call {{builder.stream(Serdes.Long(), Serdes.Long(), newTopicName)}}, 
but I'd like to avoid unnecessary materialization.

So in the current scheme, I need to create an instance of {{LongDeserializer}} 
separately so that I can then call its {{deserialize}} method in {{mapValues}}.

This situation probably won't occur frequently, so I understand if it's decided 
not to bother considering this change.

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3801) Provide static serialize() and deserialize() for use as method references

2016-06-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323561#comment-15323561
 ] 

Guozhang Wang commented on KAFKA-3801:
--

Hello [~jeff.klu...@gmail.com] could you present a concrete example that a lot 
of serde instances need to be created on-the-fly? Today for most primitive 
types, users should be able to use {{Serdes.Long}} etc to directly get the 
serde objects.

And also specifically in the description, {{KStream.mapValues}} should not 
require a serde object, right?

> Provide static serialize() and deserialize() for use as method references
> -
>
> Key: KAFKA-3801
> URL: https://issues.apache.org/jira/browse/KAFKA-3801
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Jeff Klukas
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)