Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-21 Thread Nico Kruber
Hi Chao,
what I meant by "per-record base" was actually supposed to be "per-event base" 
(event = one entity of whatever the stream contains). As from the API: 
processing is supposed to be one event at a time and this is what is performed 
internally, too.

Nico

On Thursday, 17 August 2017 05:06:07 CEST Chao Wang wrote:
> Thank you! Nico. That helps me a lot!
> 
> 2a) That really clarifies my understanding about Flink. Yes, I think I
> have used static references, since I invoked a native function
> (implemented through JNI) which I believe only has one instance per
> process. And I guess the reason why those Java synchronization
> mechanisms were in vain is because of separate function objects at
> runtime, which results in separate lock objects. Now I use c++ mutex
> within the native function and it resolves my case.
> 
> BTW, could you elaborate a bit more about what do you mean by
> "per-record base"? what do you mean by a record?
> 
> 3) I do not intend to store the CoProcessFunction.Context. I was just
> wondering that since the document said it is only valid during the
> invocation, for maintaining custom states of my program logic I guess I
> cannot use it.
> 
> 
> Thank you,
> Chao
> 
> On 08/16/2017 03:31 AM, Nico Kruber wrote:
> > Hi Chao,
> > 
> > 1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
> > quote the javadoc of the CoProcessFunction:
> > 
> > "Contrary to the {@link CoFlatMapFunction}, this function can also query
> > the time (both event and processing) and set timers, through the provided
> > {@link Context}. When reacting to the firing of set timers the function
> > can emit yet more elements."
> > 
> > So, imho, both deliver a different level of abstraction and control (high-
> > vs. low-level). Also note the different methods available for you to
> > implement.
> > 
> > 2a) In general, Flink calls functions on a per-record base in a serialized
> > fashion per task. For each task at a TaskManager, in case of it having
> > multiple slots, separate function objects are used where you should only
> > get in trouble if you share static references. Otherwise you do not need
> > to worry about thread-safety.
> > 
> > 2b) From what I see in the code (StreamTwoInputProcessor), the same should
> > apply to CoFlatMapFunction and CoProcessFunction so that calls to
> > flatMap1/2 and processElement1/2 are not called in parallel!
> > 
> > 3) why would you want to store the CoProcessFunction.Context?
> > 
> > 
> > Nico
> > 
> > On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> >> Hi,
> >> 
> >> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> >> and to what extent? What's the difference between the two Functions? and
> >> in general, how does Flink prevent race conditions? Here's my case:
> >> 
> >> I tried to condition on two input streams and produce the third stream
> >> if the condition is met. I implemented CoFlatMapFunction and tried to
> >> monitor a state using a field in the implemented class (I want to
> >> isolate my application from the checkpointing feature, and therefore I
> >> do not use the states as documented here
> >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/st
> >> ate .html). The field served as a flag indicating whether there are some
> >> pending data from either input stream, and if yes, processing it along
> >> with the arriving data from the other input stream (the processing
> >> invokes a native function).
> >> 
> >> But then I got double free error and segmentation fault, which I believe
> >> was due to unintentional concurrent access to the native function. Then
> >> I tried to wrap the access into a synchronized method, as well as
> >> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> >> and the error remained.
> >> 
> >> I considered using CoProcessFunction in my case, but seems to me that it
> >> does not handle customary internal states, stating in the javadoc "The
> >> context [CoProcessFunction.Context] is only valid during the invocation
> >> of this method, do not store it."
> >> 
> >> 
> >> 
> >> Thanks,
> >> Chao



signature.asc
Description: This is a digitally signed message part.


Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-16 Thread Chao Wang

Thank you! Nico. That helps me a lot!

2a) That really clarifies my understanding about Flink. Yes, I think I 
have used static references, since I invoked a native function 
(implemented through JNI) which I believe only has one instance per 
process. And I guess the reason why those Java synchronization 
mechanisms were in vain is because of separate function objects at 
runtime, which results in separate lock objects. Now I use c++ mutex 
within the native function and it resolves my case.


BTW, could you elaborate a bit more about what do you mean by 
"per-record base"? what do you mean by a record?


3) I do not intend to store the CoProcessFunction.Context. I was just 
wondering that since the document said it is only valid during the 
invocation, for maintaining custom states of my program logic I guess I 
cannot use it.



Thank you,
Chao


On 08/16/2017 03:31 AM, Nico Kruber wrote:

Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the
time (both event and processing) and set timers, through the provided {@link
Context}. When reacting to the firing of set timers the function can emit yet
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs.
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized
fashion per task. For each task at a TaskManager, in case of it having
multiple slots, separate function objects are used where you should only get
in trouble if you share static references. Otherwise you do not need to worry
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:

Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
and to what extent? What's the difference between the two Functions? and
in general, how does Flink prevent race conditions? Here's my case:

I tried to condition on two input streams and produce the third stream
if the condition is met. I implemented CoFlatMapFunction and tried to
monitor a state using a field in the implemented class (I want to
isolate my application from the checkpointing feature, and therefore I
do not use the states as documented here
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
.html). The field served as a flag indicating whether there are some pending
data from either input stream, and if yes, processing it along with the
arriving data from the other input stream (the processing invokes a native
function).

But then I got double free error and segmentation fault, which I believe
was due to unintentional concurrent access to the native function. Then
I tried to wrap the access into a synchronized method, as well as
explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
and the error remained.

I considered using CoProcessFunction in my case, but seems to me that it
does not handle customary internal states, stating in the javadoc "The
context [CoProcessFunction.Context] is only valid during the invocation
of this method, do not store it."



Thanks,
Chao




Re: Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-16 Thread Nico Kruber
Hi Chao,

1) regarding the difference of CoFlatMapFunction/CoProcessFunction, let me 
quote the javadoc of the CoProcessFunction:

"Contrary to the {@link CoFlatMapFunction}, this function can also query the 
time (both event and processing) and set timers, through the provided {@link 
Context}. When reacting to the firing of set timers the function can emit yet 
more elements."

So, imho, both deliver a different level of abstraction and control (high- vs. 
low-level). Also note the different methods available for you to implement.

2a) In general, Flink calls functions on a per-record base in a serialized 
fashion per task. For each task at a TaskManager, in case of it having 
multiple slots, separate function objects are used where you should only get 
in trouble if you share static references. Otherwise you do not need to worry 
about thread-safety.

2b) From what I see in the code (StreamTwoInputProcessor), the same should 
apply to CoFlatMapFunction and CoProcessFunction so that calls to flatMap1/2 
and processElement1/2 are not called in parallel!

3) why would you want to store the CoProcessFunction.Context?


Nico

On Monday, 14 August 2017 18:36:38 CEST Chao Wang wrote:
> Hi,
> 
> I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe,
> and to what extent? What's the difference between the two Functions? and
> in general, how does Flink prevent race conditions? Here's my case:
> 
> I tried to condition on two input streams and produce the third stream
> if the condition is met. I implemented CoFlatMapFunction and tried to
> monitor a state using a field in the implemented class (I want to
> isolate my application from the checkpointing feature, and therefore I
> do not use the states as documented here
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state
> .html). The field served as a flag indicating whether there are some pending
> data from either input stream, and if yes, processing it along with the
> arriving data from the other input stream (the processing invokes a native
> function).
> 
> But then I got double free error and segmentation fault, which I believe
> was due to unintentional concurrent access to the native function. Then
> I tried to wrap the access into a synchronized method, as well as
> explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain
> and the error remained.
> 
> I considered using CoProcessFunction in my case, but seems to me that it
> does not handle customary internal states, stating in the javadoc "The
> context [CoProcessFunction.Context] is only valid during the invocation
> of this method, do not store it."
> 
> 
> 
> Thanks,
> Chao



signature.asc
Description: This is a digitally signed message part.


Flink multithreading, CoFlatMapFunction, CoProcessFunction, internal state

2017-08-14 Thread Chao Wang

Hi,

I'd like to know if CoFlatMapFunction/CoProcessFunction is thread-safe, 
and to what extent? What's the difference between the two Functions? and 
in general, how does Flink prevent race conditions? Here's my case:


I tried to condition on two input streams and produce the third stream 
if the condition is met. I implemented CoFlatMapFunction and tried to 
monitor a state using a field in the implemented class (I want to 
isolate my application from the checkpointing feature, and therefore I 
do not use the states as documented here 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html). 
The field served as a flag indicating whether there are some pending 
data from either input stream, and if yes, processing it along with the 
arriving data from the other input stream (the processing invokes a 
native function).


But then I got double free error and segmentation fault, which I believe 
was due to unintentional concurrent access to the native function. Then 
I tried to wrap the access into a synchronized method, as well as 
explicitly lock/unlock the flatMap1/flatMap2 methods, but all in vain 
and the error remained.


I considered using CoProcessFunction in my case, but seems to me that it 
does not handle customary internal states, stating in the javadoc "The 
context [CoProcessFunction.Context] is only valid during the invocation 
of this method, do not store it."




Thanks,
Chao