.keyBy() on ConnectedStream

2017-01-26 Thread Matt
Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect
.map() and .flatMap()?

I'm not finding a way to group stream elements based on a key, something
like a Window on a normal Stream, but for a ConnectedStream.

Regards,
Matt


Re: .keyBy() on ConnectedStream

2017-01-27 Thread Timo Walther

Hi Matt,

the keyBy() on ConnectedStream has two parameters to specify the key of 
the left and of the right stream. Same keys end up in the same 
CoMapFunction/CoFlatMapFunction. If you want to group both streams on a 
common key, then you can use .union() instead of .connect().


I hope that helps.

Timo


Am 27/01/17 um 07:21 schrieb Matt:

Hi all,

What's the purpose of .keyBy() on ConnectedStream? How does it affect 
.map() and .flatMap()?


I'm not finding a way to group stream elements based on a key, 
something like a Window on a normal Stream, but for a ConnectedStream.


Regards,
Matt





Re: .keyBy() on ConnectedStream

2017-01-28 Thread Matt
Aha, ok, got it!

I just realized that this ConnectedStream I was talking about (A) depends
on another ConnectedStream (B), which depends on the first one (A). So it's
even trickier than I first thought.

For instance (simplified):

*predictionStream = **input*
  .connect(*statsStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj, output) {
 p = prediction(obj)
* output.collect(p)*
 }
 flatMap2(stat, output) {
 updateModel(stat)
 }
  })

*statsStream = input2*
  .connect(*predictionStream*)
  .keyBy(...)
  .flatMap(CoFlatMapFunction {
 flatMap1(obj2, output) {
s = getStats(obj2, p)
*output.collect(s)*
 }
 flatMap2(prediction, output) {
p = prediction
 }
  })

I'm guessing it should be possible to achieve, one way would be to add a
sink on statsStream to save the elements into Kafka and read from that
topic on predictionStream instead of initializing it with a reference of
statsStream. I would rather avoid writing unnecessarily into kafka.

Is there any other way to achieve this?

Thanks,
Matt

On Fri, Jan 27, 2017 at 6:35 AM, Timo Walther  wrote:

> Hi Matt,
>
> the keyBy() on ConnectedStream has two parameters to specify the key of
> the left and of the right stream. Same keys end up in the same
> CoMapFunction/CoFlatMapFunction. If you want to group both streams on a
> common key, then you can use .union() instead of .connect().
>
> I hope that helps.
>
> Timo
>
>
> Am 27/01/17 um 07:21 schrieb Matt:
>
> Hi all,
>>
>> What's the purpose of .keyBy() on ConnectedStream? How does it affect
>> .map() and .flatMap()?
>>
>> I'm not finding a way to group stream elements based on a key, something
>> like a Window on a normal Stream, but for a ConnectedStream.
>>
>> Regards,
>> Matt
>>
>
>
>


Re: .keyBy() on ConnectedStream

2017-01-28 Thread Matt
I'll create a new thread with my last message since it's not completely
related with the original question here.

On Sat, Jan 28, 2017 at 11:55 AM, Matt  wrote:

> Aha, ok, got it!
>
> I just realized that this ConnectedStream I was talking about (A) depends
> on another ConnectedStream (B), which depends on the first one (A). So it's
> even trickier than I first thought.
>
> For instance (simplified):
>
> *predictionStream = **input*
>   .connect(*statsStream*)
>   .keyBy(...)
>   .flatMap(CoFlatMapFunction {
>  flatMap1(obj, output) {
>  p = prediction(obj)
> * output.collect(p)*
>  }
>  flatMap2(stat, output) {
>  updateModel(stat)
>  }
>   })
>
> *statsStream = input2*
>   .connect(*predictionStream*)
>   .keyBy(...)
>   .flatMap(CoFlatMapFunction {
>  flatMap1(obj2, output) {
> s = getStats(obj2, p)
> *output.collect(s)*
>  }
>  flatMap2(prediction, output) {
> p = prediction
>  }
>   })
>
> I'm guessing it should be possible to achieve, one way would be to add a
> sink on statsStream to save the elements into Kafka and read from that
> topic on predictionStream instead of initializing it with a reference of
> statsStream. I would rather avoid writing unnecessarily into kafka.
>
> Is there any other way to achieve this?
>
> Thanks,
> Matt
>
> On Fri, Jan 27, 2017 at 6:35 AM, Timo Walther  wrote:
>
>> Hi Matt,
>>
>> the keyBy() on ConnectedStream has two parameters to specify the key of
>> the left and of the right stream. Same keys end up in the same
>> CoMapFunction/CoFlatMapFunction. If you want to group both streams on a
>> common key, then you can use .union() instead of .connect().
>>
>> I hope that helps.
>>
>> Timo
>>
>>
>> Am 27/01/17 um 07:21 schrieb Matt:
>>
>> Hi all,
>>>
>>> What's the purpose of .keyBy() on ConnectedStream? How does it affect
>>> .map() and .flatMap()?
>>>
>>> I'm not finding a way to group stream elements based on a key, something
>>> like a Window on a normal Stream, but for a ConnectedStream.
>>>
>>> Regards,
>>> Matt
>>>
>>
>>
>>
>


How to use keyBy on ConnectedStream?

2018-05-10 Thread Ishwara Varnasi
Hello,
I am using ConnectedStream to process two different types of messages using
CoFlatMap. However, I would like to use keyBy on the ConnectedStream such
that messages with same value of certain property should always be sent to
same instance of CoFlatMap instance. So I've tried keyBy on
ConnectedStream, surprised to see that the return type is not grouped.

ConnectedStreams connect =
myDataStream1.connect(myDataStreamOther);
connect = connect.keyBy("property1", "property2");

// property1 is a valid property in MessageTyp1 and property2 is a
valid property of MessageType2

However, I get following exception:

Caused by: org.apache.flink.api.common.InvalidProgramException: This
type (GenericType) cannot be used as key.

How to use keyBy with ConnectedStream and ensure that grouped messages are
handled by same instance of CoFlatMap?

thanks
Ishwara Varnasi


Re: How to use keyBy on ConnectedStream?

2018-05-10 Thread Xingcan Cui
Hi Ishwara,

the `keyBy()` method automatically ensures that records with the same key will 
be processed by the same instance of a CoFlatMap.

As for the exception, I suppose the types `MessageType1` and `MessageType1` are 
POJOs which should follow some rules [1]. 
Also, make sure that (1) `property1` and `property2` are not arrays; (2) their 
types have overridden the `hashCode()` method [2].

Hope that helps,
Xingcan

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/types_serialization.html#rules-for-pojo-types>
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations
 
<https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/#datastream-transformations>

> On May 10, 2018, at 10:43 PM, Ishwara Varnasi  wrote:
> 
> Hello,
> I am using ConnectedStream to process two different types of messages using 
> CoFlatMap. However, I would like to use keyBy on the ConnectedStream such 
> that messages with same value of certain property should always be sent to 
> same instance of CoFlatMap instance. So I've tried keyBy on ConnectedStream, 
> surprised to see that the return type is not grouped.
> 
> ConnectedStreams connect = 
> myDataStream1.connect(myDataStreamOther);
> connect = connect.keyBy("property1", "property2");
> // property1 is a valid property in MessageTyp1 and property2 is a valid 
> property of MessageType2
> However, I get following exception:
> Caused by: org.apache.flink.api.common.InvalidProgramException: This type 
> (GenericType) cannot be used as key.
> How to use keyBy with ConnectedStream and ensure that grouped messages are 
> handled by same instance of CoFlatMap?
> 
> thanks
> Ishwara Varnasi