Re: question on serialization ..

2018-02-13 Thread Debasish Ghosh
ah .. ok .. thanks for the clarification .. for reduce I guess the overload with Materialized does the same thing .. regards. On Tue, Feb 13, 2018 at 2:24 PM, Damian Guy wrote: > There is an overload `leftJoin(KTable, ValuJoiner, Joined)` > > Joined is where you specify the Serde for the KTable

Re: question on serialization ..

2018-02-13 Thread Damian Guy
There is an overload `leftJoin(KTable, ValuJoiner, Joined)` Joined is where you specify the Serde for the KTable and for the resulting type. We don't need the Serde for the stream at this point as the value has already been deserialized. HTH, Damian On Tue, 13 Feb 2018 at 05:39 Debasish Ghosh w

Re: question on serialization ..

2018-02-12 Thread Debasish Ghosh
Regarding “has an according overload” I agree. But some operators like reduce and leftJoin use the serdes implicitly and from the config. So if the developer is not careful enough to have the default serdes correct then it results in runtime error. Also one more confusion on my part is that in con

Re: question on serialization ..

2018-02-12 Thread Matthias J. Sax
Each operator that needs to use a Serde, has a an according overload method that allows you to overwrite the Serde. If you don't overwrite it, the operator uses the Serde from the config. > If one gets the default >> serializer wrong then she gets run time errors in serialization / >> de-serializa

Re: question on serialization ..

2018-02-12 Thread Debasish Ghosh
Thanks a lot for the clear answer. One of the concerns that I have is that it's not always obvious when the default serializers are used. e.g. it looks like KGroupedStream#reduce also uses the default serializer under the hood. If one gets the default serializer wrong then she gets run time errors

Re: question on serialization ..

2018-02-11 Thread Matthias J. Sax
For stream-table-join, only the table is (de)serialized, the stream-side in only piped through and does lookups into the table. And when reading the stream (https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrat

Re: question on serialization ..

2018-02-10 Thread Debasish Ghosh
The inputs to the leftJoin are the stream with [String, Long] and the table with [String, String]. Is the default serializer (I mean from the config) used for [String, String] ? Then how does the [String, Long] serialization work ? I guess the basic issue that I am trying to understand is how the

Re: question on serialization ..

2018-02-10 Thread Matthias J. Sax
userClicksJoinRegion is never serialized... It the result of the join and the join only (de)serializes its input in the internal stores. The output it forwarded in-memory to a consecutive map and return `clicksByRegion` that is [String,Long]. -Matthias On 2/10/18 1:17 PM, Ted Yu wrote: > Pleas

Re: question on serialization ..

2018-02-10 Thread Ted Yu
Please read the javadoc: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java and correlate with the sample code. Thanks On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh wrote: > Looking at > https://github.com/confluentinc/kafka-streams- > exa

Re: question on serialization ..

2018-02-10 Thread Debasish Ghosh
Looking at https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L148, it seems that the leftJoin generates a KStream[String, (String, Long)], which means the value is a tuple of (String, Long) .. I

Re: question on serialization ..

2018-02-10 Thread Ted Yu
If I read the code correctly, the operation on this line prepares the input for the (stringSerde, stringSerde) specified on line 142: .leftJoin(userRegionsTable, (clicks: Long, region: String) => (if (region == null) "UNKNOWN" else region, clicks)) FYI On Sat, Feb 10, 2018 at 11:00 AM, Deb

question on serialization ..

2018-02-10 Thread Debasish Ghosh
Hi - I was going through this example at https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala, especially the leftJoin part https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/te