ah .. ok .. thanks for the clarification .. for reduce I guess the overload
with Materialized does the same thing ..
regards.
On Tue, Feb 13, 2018 at 2:24 PM, Damian Guy wrote:
> There is an overload `leftJoin(KTable, ValuJoiner, Joined)`
>
> Joined is where you specify the Serde for the KTable
There is an overload `leftJoin(KTable, ValuJoiner, Joined)`
Joined is where you specify the Serde for the KTable and for the resulting
type. We don't need the Serde for the stream at this point as the value has
already been deserialized.
HTH,
Damian
On Tue, 13 Feb 2018 at 05:39 Debasish Ghosh
w
Regarding “has an according overload” I agree. But some operators like
reduce and leftJoin use the serdes implicitly and from the config. So if
the developer is not careful enough to have the default serdes correct then
it results in runtime error.
Also one more confusion on my part is that in con
Each operator that needs to use a Serde, has a an according overload
method that allows you to overwrite the Serde. If you don't overwrite
it, the operator uses the Serde from the config.
> If one gets the default
>> serializer wrong then she gets run time errors in serialization /
>> de-serializa
Thanks a lot for the clear answer.
One of the concerns that I have is that it's not always obvious when the
default serializers are used. e.g. it looks like KGroupedStream#reduce also
uses the default serializer under the hood. If one gets the default
serializer wrong then she gets run time errors
For stream-table-join, only the table is (de)serialized, the stream-side
in only piped through and does lookups into the table.
And when reading the stream
(https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrat
The inputs to the leftJoin are the stream with [String, Long] and the table
with [String, String]. Is the default serializer (I mean from the config)
used for [String, String] ? Then how does the [String, Long] serialization
work ?
I guess the basic issue that I am trying to understand is how the
userClicksJoinRegion is never serialized...
It the result of the join and the join only (de)serializes its input in
the internal stores.
The output it forwarded in-memory to a consecutive map and return
`clicksByRegion` that is [String,Long].
-Matthias
On 2/10/18 1:17 PM, Ted Yu wrote:
> Pleas
Please read the javadoc:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/Consumed.java
and correlate with the sample code.
Thanks
On Sat, Feb 10, 2018 at 1:10 PM, Debasish Ghosh
wrote:
> Looking at
> https://github.com/confluentinc/kafka-streams-
> exa
Looking at
https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala#L148,
it seems that the leftJoin generates a KStream[String, (String, Long)],
which means the value is a tuple of (String, Long) .. I
If I read the code correctly, the operation on this line prepares the input
for the (stringSerde, stringSerde) specified on line 142:
.leftJoin(userRegionsTable, (clicks: Long, region: String) => (if
(region == null) "UNKNOWN" else region, clicks))
FYI
On Sat, Feb 10, 2018 at 11:00 AM, Deb
Hi -
I was going through this example at
https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/test/scala/io/confluent/examples/streams/StreamToTableJoinScalaIntegrationTest.scala,
especially the leftJoin part
https://github.com/confluentinc/kafka-streams-examples/blob/3.3.x/src/te
12 matches
Mail list logo