Re: How to set result value Serdes Class in Kafka stream join

2017-11-16 Thread sy.pan
Get it , thank you Damian


> 在 2017年11月16日,18:55,Damian Guy  写道:
> 
> Hi,
> 
> You don't need to set the serde until you do another operation that
> requires serialization, i.e., if you followed the join with a `to()`,
> `groupBy()` etc, you would pass in the serde to that operation.
> 
> Thanks,
> Damian
> 
> On Thu, 16 Nov 2017 at 10:53 sy.pan  wrote:
> 
>> Hi, all:
>> 
>> Recently I have read kafka streams join document(
>> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
>> <
>> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).
>> The sample code is pasted below:
>> 
>> import java.util.concurrent.TimeUnit;
>> KStream left = ...;
>> KStream right = ...;
>> // Java 7 example
>> KStream joined = left.join(right,
>>new ValueJoiner() {
>>  @Override
>>  public String apply(Long leftValue, Double rightValue) {
>>return "left=" + leftValue + ", right=" + rightValue;
>>  }
>>},
>>JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
>>Serdes.String(), /* key */
>>Serdes.Long(),   /* left value */
>>Serdes.Double()  /* right value */
>>  );
>> 
>> so the question is :
>> 
>> 1) which parameter is used for setting result value(returned by
>> ValueJoiner) Serdes class ?
>>the sample code only set key , left value and right value Serdes class.
>> 
>> 2) if ValueJoiner return customer value type,  how to set the result value
>> Serdes class ?
>> 
>> 
>> 
>> 



Re: How to set result value Serdes Class in Kafka stream join

2017-11-16 Thread Damian Guy
Hi,

You don't need to set the serde until you do another operation that
requires serialization, i.e., if you followed the join with a `to()`,
`groupBy()` etc, you would pass in the serde to that operation.

Thanks,
Damian

On Thu, 16 Nov 2017 at 10:53 sy.pan  wrote:

> Hi, all:
>
> Recently I have read kafka streams join document(
> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
> <
> https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl>).
> The sample code is pasted below:
>
> import java.util.concurrent.TimeUnit;
> KStream left = ...;
> KStream right = ...;
> // Java 7 example
> KStream joined = left.join(right,
> new ValueJoiner() {
>   @Override
>   public String apply(Long leftValue, Double rightValue) {
> return "left=" + leftValue + ", right=" + rightValue;
>   }
> },
> JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
> Serdes.String(), /* key */
> Serdes.Long(),   /* left value */
> Serdes.Double()  /* right value */
>   );
>
> so the question is :
>
> 1) which parameter is used for setting result value(returned by
> ValueJoiner) Serdes class ?
> the sample code only set key , left value and right value Serdes class.
>
> 2) if ValueJoiner return customer value type,  how to set the result value
> Serdes class ?
>
>
>
>


How to set result value Serdes Class in Kafka stream join

2017-11-16 Thread sy.pan
Hi, all:

Recently I have read kafka streams join 
document(https://docs.confluent.io/current/streams/developer-guide.html#kafka-streams-dsl
 
).
  The sample code is pasted below:

import java.util.concurrent.TimeUnit;
KStream left = ...;
KStream right = ...;
// Java 7 example
KStream joined = left.join(right,
new ValueJoiner() {
  @Override
  public String apply(Long leftValue, Double rightValue) {
return "left=" + leftValue + ", right=" + rightValue;
  }
},
JoinWindows.of(TimeUnit.MINUTES.toMillis(5)),
Serdes.String(), /* key */
Serdes.Long(),   /* left value */
Serdes.Double()  /* right value */
  );

so the question is :

1) which parameter is used for setting result value(returned by ValueJoiner) 
Serdes class ?
the sample code only set key , left value and right value Serdes class.

2) if ValueJoiner return customer value type,  how to set the result value 
Serdes class ?