Re: intentional back-pressure (or a poor man's side-input)

2018-05-04 Thread Piotr Nowojski
> running a batch or "bounded stream" job first to generate a "cache state", 
> and then launching the main streaming job, which loads this initial state 
> load in open()... not sure how to work out the keying.
> 

This is the recommended workaround this issue - first start a job to precompute 
some values for an initial state and then pass those values to the main job as 
(for example) a startup argument. I think for now it’s the cleanest and the 
easiest to maintain solution. If initial state is too large, you could imagine 
saving it on a DFS and loading it in initialise phase of the main job.

Piotrek

> On 3 May 2018, at 19:03, Derek VerLee  wrote:
> 
> Thanks for the thoughts Piotr.
> 
> Seems I have a talent for asking (nearly) the same question as someone else 
> at the same time, and the check-pointing was raised in that thread as well.
> 
> I guess one way to conceptualize it is that you have is a stream job that has 
> "phases" and transitions between those phases.  Maybe there would be a new 
> type of barrier to indicate a change between phases?  But now I'm way outside 
> the bounds of hoping to have a "quick and dirty" version of a proper side 
> input implementation.
> 
> I'm chewing on two new ideas now:  Using a "union" stream instead of two 
> streams, and custom source backed by two different sources under the hood, so 
> the "state machine" logic transitioning from initialization to normal 
> operation all happen in the same operator.  Or, running a batch or "bounded 
> stream" job first to generate a "cache state", and then launching the main 
> streaming job, which loads this initial state load in open()... not sure how 
> to work out the keying.
> 
> I'll post back if I get anywhere with these ideas.
> 
> On 5/3/18 10:49 AM, Piotr Nowojski wrote:
>> Maybe it could work with Flink’s 1.5 credit base flow control. But you would 
>> need a way to express state “block one input side of the CoProcessFunction”, 
>> pass this information up to the input gate and handle it probably similar to 
>> how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks 
>> inputs in case of checkpoint barrier. You can not just block inside 
>> `processElement1` method.
>> 
>> However I haven’t thought it through and maybe there could be some issues 
>> regarding checkpointing (what should happen to checkpoint barriers if you 
>> are blocking one side of the input? Should this block checkpoint barrier as 
>> well? Should you cancel checkpoint?).
>> 
>> Piotrek
>> 
>>> On 2 May 2018, at 16:31, Derek VerLee  
>>>  wrote:
>>> 
>>> 
>>> I was just thinking about about letting a coprocessfunction "block" or 
>>> cause back pressure on one of it's streams?
>>> Has this been discussed as an option?
>>> Does anyone know a way to effectively accomplish this?
>>> 
>>> I think I could get a lot of mileage out of something like that without 
>>> needing a full implementation of FLIP-17 (which I would eagerly await 
>>> still). 
>>> 
>>> As mentioned on another thread, one could use a liststate to buffer the 
>>> main input until the "side input" was sufficiently processed.  However the 
>>> downside of this is that I have no way to control the size of those 
>>> buffers, whereas with backpressure, the system will naturally take care of 
>>> it.
> 



Re: intentional back-pressure (or a poor man's side-input)

2018-05-03 Thread Derek VerLee

  
  
Thanks for the thoughts Piotr.
Seems I have a talent for asking (nearly) the same question as
  someone else at the same time, and the check-pointing was raised
  in that thread as well.
I guess one way to conceptualize it is that you have is a stream
  job that has "phases" and transitions between those phases.  Maybe
  there would be a new type of barrier to indicate a change between
  phases?  But now I'm way outside the bounds of hoping to have a
  "quick and dirty" version of a proper side input implementation.
I'm chewing on two new ideas now:  Using a "union" stream instead
  of two streams, and custom source backed by two different sources
  under the hood, so the "state machine" logic transitioning from
  initialization to normal operation all happen in the same
  operator.  Or, running a batch or "bounded stream" job first to
  generate a "cache state", and then launching the main streaming
  job, which loads this initial state load in open()... not sure how
  to work out the keying.
I'll post back if I get anywhere with these ideas.



On 5/3/18 10:49 AM, Piotr Nowojski
  wrote:


  Maybe it could work with Flink’s 1.5 credit base flow control. But you would need a way to express state “block one input side of the CoProcessFunction”, pass this information up to the input gate and handle it probably similar to how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs in case of checkpoint barrier. You can not just block inside `processElement1` method.

However I haven’t thought it through and maybe there could be some issues regarding checkpointing (what should happen to checkpoint barriers if you are blocking one side of the input? Should this block checkpoint barrier as well? Should you cancel checkpoint?).

Piotrek


  
On 2 May 2018, at 16:31, Derek VerLee  wrote:


I was just thinking about about letting a coprocessfunction "block" or cause back pressure on one of it's streams?
Has this been discussed as an option?
Does anyone know a way to effectively accomplish this?

I think I could get a lot of mileage out of something like that without needing a full implementation of FLIP-17 (which I would eagerly await still). 

As mentioned on another thread, one could use a liststate to buffer the main input until the "side input" was sufficiently processed.  However the downside of this is that I have no way to control the size of those buffers, whereas with backpressure, the system will naturally take care of it.

  
  



  



Re: intentional back-pressure (or a poor man's side-input)

2018-05-03 Thread Piotr Nowojski
Maybe it could work with Flink’s 1.5 credit base flow control. But you would 
need a way to express state “block one input side of the CoProcessFunction”, 
pass this information up to the input gate and handle it probably similar to 
how `org.apache.flink.streaming.runtime.io.CachedBufferBlocker` blocks inputs 
in case of checkpoint barrier. You can not just block inside `processElement1` 
method.

However I haven’t thought it through and maybe there could be some issues 
regarding checkpointing (what should happen to checkpoint barriers if you are 
blocking one side of the input? Should this block checkpoint barrier as well? 
Should you cancel checkpoint?).

Piotrek

> On 2 May 2018, at 16:31, Derek VerLee  wrote:
> 
> 
> I was just thinking about about letting a coprocessfunction "block" or cause 
> back pressure on one of it's streams?
> Has this been discussed as an option?
> Does anyone know a way to effectively accomplish this?
> 
> I think I could get a lot of mileage out of something like that without 
> needing a full implementation of FLIP-17 (which I would eagerly await still). 
> 
> As mentioned on another thread, one could use a liststate to buffer the main 
> input until the "side input" was sufficiently processed.  However the 
> downside of this is that I have no way to control the size of those buffers, 
> whereas with backpressure, the system will naturally take care of it.