Re: Invite to comment on the @RequiresStableInput design doc

2018-07-09 Thread Lukasz Cwik
I'm also thinking that it would be best to apply to the whole transform. So
side inputs, main inputs, timers and any future input constructs.



On Sat, Jul 7, 2018 at 2:00 PM Reuven Lax  wrote:

> I think the entire transform. There might be some use case for having only
> some inputs stable, but I can't think of any offhand.
>
> BTW, it so happens that with DataflowRunner all side inputs happen to be
> stable (though that's more of a side effect of implementation).
>
> On Tue, Jul 3, 2018 at 9:46 AM Lukasz Cwik  wrote:
>
>> Does it make sense to only have some inputs be stable for a transform or
>> for the entire transform to require stable inputs?
>>
>> On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles  wrote:
>>
>>> Since we always assume ProcessElement could have arbitrary side effects
>>> (esp. randomization), the state and timers set up by a call to
>>> ProcessElement cannot be considered stable until they are persisted. It
>>> seems very similar to the cost of outputting to a downstream
>>> @RequiresStableInput transform, if not an identical implementation.
>>>
>>> The thing timers add is a way to loop which you can't do if it is an
>>> output.
>>>
>>> Adding @Pure annotations might help, if the input elements are stable
>>> and ProcessElement is pure.
>>>
>>> Kenn
>>>
>>> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax  wrote:
>>>
 The common use case for a timer is to read in data that was stored
 using the state API in processElement. There is no guarantee that is
 stable, and I believe no runner currently guarantees this. For example:

 class MyDoFn extends DoFn {
   @StateId("bag") private final StateSpec> buffer =
 StateSpec.bag(ElementCoder.of());
   @TimerId("timer") private final TimerSpec =
 TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

   @ProcessElement public void processElement(@Element ElementT
 element, @StateId("bag") BagState bag, @TimerId("timer") Timer
 timer) {
   bag.add(element);

 timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
   }

   @OnTimer("timer") public void onTimer(@StateId("bag")
 BagState bag) {
 sendToExternalSystem(bag.read());
   }
 }

 If you tagged onTimer with @RequiresStableInput, then you could
 guarantee that if the timer retried then it would read the same elements
 out of the bag. Today this is not guaranteed - the data written to the bag
 might not even be persisted yet when the timer fires (for example, both the
 processElement and the onTimer might be executed by the runner in the same
 bundle).

 This particular example is a simplistic one of course - you could
 accomplish the same thing with triggers. When Raghu worked on the
 exactly-once Kafka sink this was very problematic. The final solution used
 some specific details of Kafka to work, and is complicated and not portable
 to other sinks.

 BTW - you can of course just have OnTimer produce the output to another
 transform marked with RequiresStableInput. However this solution is very
 expensive - every element must be persisted to stable storage multiple
 times - and we tried hard to avoid doing this in the Kafka sink.

 Reuven

 On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw 
 wrote:

> Could you give an example of such a usecase? (I suppose I'm not quite
> following what it means for a timer to be unstable...)
>
> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:
>
>> One issue: we definitely have some strong use cases where we want
>> this on ProcessTimer but not on ProcessElement. Since both are on the 
>> same
>> DoFn, I'm not sure how you would represent this as a separate transform.
>>
>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw 
>> wrote:
>>
>>> Thanks for the writeup.
>>>
>>> I'm wondering with, rather than phrasing this as an annotation on
>>> DoFn methods that gets plumbed down through the portability 
>>> representation,
>>> if it would make more sense to introduce a new, primitive
>>> "EnsureStableInput" transform. For those runners whose reshuffle provide
>>> stable inputs, they could use that as an implementation, and other 
>>> runners
>>> could provide other suitable implementations.
>>>
>>>
>>>
>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
>>>
 Hi everyone,

 Thanks for your feedback on the doc. I have revamped it according
 to all of the comments. The major changes I have made are:
 * The problem description should be more general and accurate now.
 * I added more background information, such as details about
 Reshuffle, so I should be easier to understand now.
 * I made it clear what is the scope of my current project and what
 could be left to future w

Re: Invite to comment on the @RequiresStableInput design doc

2018-07-07 Thread Reuven Lax
I think the entire transform. There might be some use case for having only
some inputs stable, but I can't think of any offhand.

BTW, it so happens that with DataflowRunner all side inputs happen to be
stable (though that's more of a side effect of implementation).

On Tue, Jul 3, 2018 at 9:46 AM Lukasz Cwik  wrote:

> Does it make sense to only have some inputs be stable for a transform or
> for the entire transform to require stable inputs?
>
> On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles  wrote:
>
>> Since we always assume ProcessElement could have arbitrary side effects
>> (esp. randomization), the state and timers set up by a call to
>> ProcessElement cannot be considered stable until they are persisted. It
>> seems very similar to the cost of outputting to a downstream
>> @RequiresStableInput transform, if not an identical implementation.
>>
>> The thing timers add is a way to loop which you can't do if it is an
>> output.
>>
>> Adding @Pure annotations might help, if the input elements are stable and
>> ProcessElement is pure.
>>
>> Kenn
>>
>> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax  wrote:
>>
>>> The common use case for a timer is to read in data that was stored using
>>> the state API in processElement. There is no guarantee that is stable, and
>>> I believe no runner currently guarantees this. For example:
>>>
>>> class MyDoFn extends DoFn {
>>>   @StateId("bag") private final StateSpec> buffer =
>>> StateSpec.bag(ElementCoder.of());
>>>   @TimerId("timer") private final TimerSpec =
>>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>>
>>>   @ProcessElement public void processElement(@Element ElementT
>>> element, @StateId("bag") BagState bag, @TimerId("timer") Timer
>>> timer) {
>>>   bag.add(element);
>>>
>>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>>>   }
>>>
>>>   @OnTimer("timer") public void onTimer(@StateId("bag")
>>> BagState bag) {
>>> sendToExternalSystem(bag.read());
>>>   }
>>> }
>>>
>>> If you tagged onTimer with @RequiresStableInput, then you could
>>> guarantee that if the timer retried then it would read the same elements
>>> out of the bag. Today this is not guaranteed - the data written to the bag
>>> might not even be persisted yet when the timer fires (for example, both the
>>> processElement and the onTimer might be executed by the runner in the same
>>> bundle).
>>>
>>> This particular example is a simplistic one of course - you could
>>> accomplish the same thing with triggers. When Raghu worked on the
>>> exactly-once Kafka sink this was very problematic. The final solution used
>>> some specific details of Kafka to work, and is complicated and not portable
>>> to other sinks.
>>>
>>> BTW - you can of course just have OnTimer produce the output to another
>>> transform marked with RequiresStableInput. However this solution is very
>>> expensive - every element must be persisted to stable storage multiple
>>> times - and we tried hard to avoid doing this in the Kafka sink.
>>>
>>> Reuven
>>>
>>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw 
>>> wrote:
>>>
 Could you give an example of such a usecase? (I suppose I'm not quite
 following what it means for a timer to be unstable...)

 On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:

> One issue: we definitely have some strong use cases where we want this
> on ProcessTimer but not on ProcessElement. Since both are on the same 
> DoFn,
> I'm not sure how you would represent this as a separate transform.
>
> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw 
> wrote:
>
>> Thanks for the writeup.
>>
>> I'm wondering with, rather than phrasing this as an annotation on
>> DoFn methods that gets plumbed down through the portability 
>> representation,
>> if it would make more sense to introduce a new, primitive
>> "EnsureStableInput" transform. For those runners whose reshuffle provide
>> stable inputs, they could use that as an implementation, and other 
>> runners
>> could provide other suitable implementations.
>>
>>
>>
>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
>>
>>> Hi everyone,
>>>
>>> Thanks for your feedback on the doc. I have revamped it according to
>>> all of the comments. The major changes I have made are:
>>> * The problem description should be more general and accurate now.
>>> * I added more background information, such as details about
>>> Reshuffle, so I should be easier to understand now.
>>> * I made it clear what is the scope of my current project and what
>>> could be left to future work.
>>> * It now reflects the current progress of my work, and discusses how
>>> it should work with the portable pipeline representation (WIP)
>>>
>>> Also, I forgot to mention last time that this doc may be interesting
>>> to those of you interested in Reshuffle, because Reshuffle is used a

Re: Invite to comment on the @RequiresStableInput design doc

2018-07-04 Thread Etienne Chauchot
Hi,
I sent a comment on the doc, just for my understanding of the reshuffle work 
around.
Etienne
Le mardi 03 juillet 2018 à 07:33 -0700, Kenneth Knowles a écrit :
> Since we always assume ProcessElement could have arbitrary side effects (esp. 
> randomization), the state and timers set
> up by a call to ProcessElement cannot be considered stable until they are 
> persisted. It seems very similar to the cost
> of outputting to a downstream @RequiresStableInput transform, if not an 
> identical implementation.
> The thing timers add is a way to loop which you can't do if it is an output.
> Adding @Pure annotations might help, if the input elements are stable and 
> ProcessElement is pure.
> 
> Kenn
> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax  wrote:
> > The common use case for a timer is to read in data that was stored using 
> > the state API in processElement. There is
> > no guarantee that is stable, and I believe no runner currently guarantees 
> > this. For example:
> > 
> > class MyDoFn extends DoFn {
> >   @StateId("bag") private final StateSpec> buffer = 
> > StateSpec.bag(ElementCoder.of());
> >   @TimerId("timer") private final TimerSpec = 
> > TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
> > 
> >   @ProcessElement public void processElement(@Element ElementT element, 
> > @StateId("bag") BagState bag,
> > @TimerId("timer") Timer timer) {
> >   bag.add(element);
> >   
> > timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
> >   }
> > 
> >   @OnTimer("timer") public void onTimer(@StateId("bag") BagState 
> > bag) {
> > sendToExternalSystem(bag.read());
> >   }
> > }
> > 
> > If you tagged onTimer with @RequiresStableInput, then you could guarantee 
> > that if the timer retried then it would
> > read the same elements out of the bag. Today this is not guaranteed - the 
> > data written to the bag might not even be
> > persisted yet when the timer fires (for example, both the processElement 
> > and the onTimer might be executed by the
> > runner in the same bundle).
> > 
> > This particular example is a simplistic one of course - you could 
> > accomplish the same thing with triggers. When
> > Raghu worked on the exactly-once Kafka sink this was very problematic. The 
> > final solution used some specific details
> > of Kafka to work, and is complicated and not portable to other sinks.
> > 
> > BTW - you can of course just have OnTimer produce the output to another 
> > transform marked with RequiresStableInput.
> > However this solution is very expensive - every element must be persisted 
> > to stable storage multiple times - and we
> > tried hard to avoid doing this in the Kafka sink.
> > 
> > Reuven
> > On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw  wrote:
> > > Could you give an example of such a usecase? (I suppose I'm not quite 
> > > following what it means for a timer to be
> > > unstable...)
> > > 
> > > On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:
> > > > One issue: we definitely have some strong use cases where we want this 
> > > > on ProcessTimer but not on
> > > > ProcessElement. Since both are on the same DoFn, I'm not sure how you 
> > > > would represent this as a separate
> > > > transform.
> > > > On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw  
> > > > wrote:
> > > > > Thanks for the writeup. 
> > > > > I'm wondering with, rather than phrasing this as an annotation on 
> > > > > DoFn methods that gets plumbed down through
> > > > > the portability representation, if it would make more sense to 
> > > > > introduce a new, primitive "EnsureStableInput"
> > > > > transform. For those runners whose reshuffle provide stable inputs, 
> > > > > they could use that as an implementation,
> > > > > and other runners could provide other suitable implementations. 
> > > > > 
> > > > > 
> > > > > 
> > > > > On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
> > > > > > Hi everyone,
> > > > > > Thanks for your feedback on the doc. I have revamped it according 
> > > > > > to all of the comments. The major changes
> > > > > > I have made are:
> > > > > > * The problem description should be more general and accurate now.
> > > > > > * I added more background information, such as details about 
> > > > > > Reshuffle, so I should be easier to understand
> > > > > > now.
> > > > > > * I made it clear what is the scope of my current project and what 
> > > > > > could be left to future work.
> > > > > > * It now reflects the current progress of my work, and discusses 
> > > > > > how it should work with the portable
> > > > > > pipeline representation (WIP)
> > > > > > 
> > > > > > Also, I forgot to mention last time that this doc may be 
> > > > > > interesting to those of you interested in
> > > > > > Reshuffle, because Reshuffle is used as a current workaround for 
> > > > > > the problem described in the doc.
> > > > > > 
> > > > > > More comments are always welcome.
> > > > > > 
> > > > > > Best,
> > > > > > Robin
> > > > > > On Fri,

Re: Invite to comment on the @RequiresStableInput design doc

2018-07-03 Thread Lukasz Cwik
Does it make sense to only have some inputs be stable for a transform or
for the entire transform to require stable inputs?

On Tue, Jul 3, 2018 at 7:34 AM Kenneth Knowles  wrote:

> Since we always assume ProcessElement could have arbitrary side effects
> (esp. randomization), the state and timers set up by a call to
> ProcessElement cannot be considered stable until they are persisted. It
> seems very similar to the cost of outputting to a downstream
> @RequiresStableInput transform, if not an identical implementation.
>
> The thing timers add is a way to loop which you can't do if it is an
> output.
>
> Adding @Pure annotations might help, if the input elements are stable and
> ProcessElement is pure.
>
> Kenn
>
> On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax  wrote:
>
>> The common use case for a timer is to read in data that was stored using
>> the state API in processElement. There is no guarantee that is stable, and
>> I believe no runner currently guarantees this. For example:
>>
>> class MyDoFn extends DoFn {
>>   @StateId("bag") private final StateSpec> buffer =
>> StateSpec.bag(ElementCoder.of());
>>   @TimerId("timer") private final TimerSpec =
>> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>>
>>   @ProcessElement public void processElement(@Element ElementT
>> element, @StateId("bag") BagState bag, @TimerId("timer") Timer
>> timer) {
>>   bag.add(element);
>>
>> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>>   }
>>
>>   @OnTimer("timer") public void onTimer(@StateId("bag")
>> BagState bag) {
>> sendToExternalSystem(bag.read());
>>   }
>> }
>>
>> If you tagged onTimer with @RequiresStableInput, then you could guarantee
>> that if the timer retried then it would read the same elements out of the
>> bag. Today this is not guaranteed - the data written to the bag might not
>> even be persisted yet when the timer fires (for example, both the
>> processElement and the onTimer might be executed by the runner in the same
>> bundle).
>>
>> This particular example is a simplistic one of course - you could
>> accomplish the same thing with triggers. When Raghu worked on the
>> exactly-once Kafka sink this was very problematic. The final solution used
>> some specific details of Kafka to work, and is complicated and not portable
>> to other sinks.
>>
>> BTW - you can of course just have OnTimer produce the output to another
>> transform marked with RequiresStableInput. However this solution is very
>> expensive - every element must be persisted to stable storage multiple
>> times - and we tried hard to avoid doing this in the Kafka sink.
>>
>> Reuven
>>
>> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw 
>> wrote:
>>
>>> Could you give an example of such a usecase? (I suppose I'm not quite
>>> following what it means for a timer to be unstable...)
>>>
>>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:
>>>
 One issue: we definitely have some strong use cases where we want this
 on ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
 I'm not sure how you would represent this as a separate transform.

 On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw 
 wrote:

> Thanks for the writeup.
>
> I'm wondering with, rather than phrasing this as an annotation on DoFn
> methods that gets plumbed down through the portability representation, if
> it would make more sense to introduce a new, primitive "EnsureStableInput"
> transform. For those runners whose reshuffle provide stable inputs, they
> could use that as an implementation, and other runners could provide other
> suitable implementations.
>
>
>
> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
>
>> Hi everyone,
>>
>> Thanks for your feedback on the doc. I have revamped it according to
>> all of the comments. The major changes I have made are:
>> * The problem description should be more general and accurate now.
>> * I added more background information, such as details about
>> Reshuffle, so I should be easier to understand now.
>> * I made it clear what is the scope of my current project and what
>> could be left to future work.
>> * It now reflects the current progress of my work, and discusses how
>> it should work with the portable pipeline representation (WIP)
>>
>> Also, I forgot to mention last time that this doc may be interesting
>> to those of you interested in Reshuffle, because Reshuffle is used as a
>> current workaround for the problem described in the doc.
>>
>> More comments are always welcome.
>>
>> Best,
>> Robin
>>
>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles 
>> wrote:
>>
>>> Thanks for the write up. It is great to see someone pushing this
>>> through.
>>>
>>> I wanted to bring Luke's high-level question back to the list for
>>> visibility: what about portability and o

Re: Invite to comment on the @RequiresStableInput design doc

2018-07-03 Thread Kenneth Knowles
Since we always assume ProcessElement could have arbitrary side effects
(esp. randomization), the state and timers set up by a call to
ProcessElement cannot be considered stable until they are persisted. It
seems very similar to the cost of outputting to a downstream
@RequiresStableInput transform, if not an identical implementation.

The thing timers add is a way to loop which you can't do if it is an output.

Adding @Pure annotations might help, if the input elements are stable and
ProcessElement is pure.

Kenn

On Mon, Jul 2, 2018 at 7:05 PM Reuven Lax  wrote:

> The common use case for a timer is to read in data that was stored using
> the state API in processElement. There is no guarantee that is stable, and
> I believe no runner currently guarantees this. For example:
>
> class MyDoFn extends DoFn {
>   @StateId("bag") private final StateSpec> buffer =
> StateSpec.bag(ElementCoder.of());
>   @TimerId("timer") private final TimerSpec =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>
>   @ProcessElement public void processElement(@Element ElementT
> element, @StateId("bag") BagState bag, @TimerId("timer") Timer
> timer) {
>   bag.add(element);
>
> timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
>   }
>
>   @OnTimer("timer") public void onTimer(@StateId("bag")
> BagState bag) {
> sendToExternalSystem(bag.read());
>   }
> }
>
> If you tagged onTimer with @RequiresStableInput, then you could guarantee
> that if the timer retried then it would read the same elements out of the
> bag. Today this is not guaranteed - the data written to the bag might not
> even be persisted yet when the timer fires (for example, both the
> processElement and the onTimer might be executed by the runner in the same
> bundle).
>
> This particular example is a simplistic one of course - you could
> accomplish the same thing with triggers. When Raghu worked on the
> exactly-once Kafka sink this was very problematic. The final solution used
> some specific details of Kafka to work, and is complicated and not portable
> to other sinks.
>
> BTW - you can of course just have OnTimer produce the output to another
> transform marked with RequiresStableInput. However this solution is very
> expensive - every element must be persisted to stable storage multiple
> times - and we tried hard to avoid doing this in the Kafka sink.
>
> Reuven
>
> On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw 
> wrote:
>
>> Could you give an example of such a usecase? (I suppose I'm not quite
>> following what it means for a timer to be unstable...)
>>
>> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:
>>
>>> One issue: we definitely have some strong use cases where we want this
>>> on ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
>>> I'm not sure how you would represent this as a separate transform.
>>>
>>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw 
>>> wrote:
>>>
 Thanks for the writeup.

 I'm wondering with, rather than phrasing this as an annotation on DoFn
 methods that gets plumbed down through the portability representation, if
 it would make more sense to introduce a new, primitive "EnsureStableInput"
 transform. For those runners whose reshuffle provide stable inputs, they
 could use that as an implementation, and other runners could provide other
 suitable implementations.



 On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:

> Hi everyone,
>
> Thanks for your feedback on the doc. I have revamped it according to
> all of the comments. The major changes I have made are:
> * The problem description should be more general and accurate now.
> * I added more background information, such as details about
> Reshuffle, so I should be easier to understand now.
> * I made it clear what is the scope of my current project and what
> could be left to future work.
> * It now reflects the current progress of my work, and discusses how
> it should work with the portable pipeline representation (WIP)
>
> Also, I forgot to mention last time that this doc may be interesting
> to those of you interested in Reshuffle, because Reshuffle is used as a
> current workaround for the problem described in the doc.
>
> More comments are always welcome.
>
> Best,
> Robin
>
> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles 
> wrote:
>
>> Thanks for the write up. It is great to see someone pushing this
>> through.
>>
>> I wanted to bring Luke's high-level question back to the list for
>> visibility: what about portability and other SDKs?
>>
>> Portability is probably trivial, but the "other SDKs" question is
>> probably best answered by folks working on them who can have opinions 
>> about
>> how it works in their SDKs idioms.
>>
>> Kenn
>> ​
>>
>


Re: Invite to comment on the @RequiresStableInput design doc

2018-07-02 Thread Reuven Lax
The common use case for a timer is to read in data that was stored using
the state API in processElement. There is no guarantee that is stable, and
I believe no runner currently guarantees this. For example:

class MyDoFn extends DoFn {
  @StateId("bag") private final StateSpec> buffer =
StateSpec.bag(ElementCoder.of());
  @TimerId("timer") private final TimerSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

  @ProcessElement public void processElement(@Element ElementT
element, @StateId("bag") BagState bag, @TimerId("timer") Timer
timer) {
  bag.add(element);

timer.align(Duration.standardSeconds(30)).offset(Duration.standardSeconds(3)).setRelative();
  }

  @OnTimer("timer") public void onTimer(@StateId("bag") BagState
bag) {
sendToExternalSystem(bag.read());
  }
}

If you tagged onTimer with @RequiresStableInput, then you could guarantee
that if the timer retried then it would read the same elements out of the
bag. Today this is not guaranteed - the data written to the bag might not
even be persisted yet when the timer fires (for example, both the
processElement and the onTimer might be executed by the runner in the same
bundle).

This particular example is a simplistic one of course - you could
accomplish the same thing with triggers. When Raghu worked on the
exactly-once Kafka sink this was very problematic. The final solution used
some specific details of Kafka to work, and is complicated and not portable
to other sinks.

BTW - you can of course just have OnTimer produce the output to another
transform marked with RequiresStableInput. However this solution is very
expensive - every element must be persisted to stable storage multiple
times - and we tried hard to avoid doing this in the Kafka sink.

Reuven

On Mon, Jul 2, 2018 at 6:24 PM Robert Bradshaw  wrote:

> Could you give an example of such a usecase? (I suppose I'm not quite
> following what it means for a timer to be unstable...)
>
> On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:
>
>> One issue: we definitely have some strong use cases where we want this on
>> ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
>> I'm not sure how you would represent this as a separate transform.
>>
>> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw 
>> wrote:
>>
>>> Thanks for the writeup.
>>>
>>> I'm wondering with, rather than phrasing this as an annotation on DoFn
>>> methods that gets plumbed down through the portability representation, if
>>> it would make more sense to introduce a new, primitive "EnsureStableInput"
>>> transform. For those runners whose reshuffle provide stable inputs, they
>>> could use that as an implementation, and other runners could provide other
>>> suitable implementations.
>>>
>>>
>>>
>>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
>>>
 Hi everyone,

 Thanks for your feedback on the doc. I have revamped it according to
 all of the comments. The major changes I have made are:
 * The problem description should be more general and accurate now.
 * I added more background information, such as details about Reshuffle,
 so I should be easier to understand now.
 * I made it clear what is the scope of my current project and what
 could be left to future work.
 * It now reflects the current progress of my work, and discusses how it
 should work with the portable pipeline representation (WIP)

 Also, I forgot to mention last time that this doc may be interesting to
 those of you interested in Reshuffle, because Reshuffle is used as a
 current workaround for the problem described in the doc.

 More comments are always welcome.

 Best,
 Robin

 On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles  wrote:

> Thanks for the write up. It is great to see someone pushing this
> through.
>
> I wanted to bring Luke's high-level question back to the list for
> visibility: what about portability and other SDKs?
>
> Portability is probably trivial, but the "other SDKs" question is
> probably best answered by folks working on them who can have opinions 
> about
> how it works in their SDKs idioms.
>
> Kenn
> ​
>



Re: Invite to comment on the @RequiresStableInput design doc

2018-07-02 Thread Robert Bradshaw
Could you give an example of such a usecase? (I suppose I'm not quite
following what it means for a timer to be unstable...)

On Mon, Jul 2, 2018 at 6:20 PM Reuven Lax  wrote:

> One issue: we definitely have some strong use cases where we want this on
> ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
> I'm not sure how you would represent this as a separate transform.
>
> On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw 
> wrote:
>
>> Thanks for the writeup.
>>
>> I'm wondering with, rather than phrasing this as an annotation on DoFn
>> methods that gets plumbed down through the portability representation, if
>> it would make more sense to introduce a new, primitive "EnsureStableInput"
>> transform. For those runners whose reshuffle provide stable inputs, they
>> could use that as an implementation, and other runners could provide other
>> suitable implementations.
>>
>>
>>
>> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
>>
>>> Hi everyone,
>>>
>>> Thanks for your feedback on the doc. I have revamped it according to all
>>> of the comments. The major changes I have made are:
>>> * The problem description should be more general and accurate now.
>>> * I added more background information, such as details about Reshuffle,
>>> so I should be easier to understand now.
>>> * I made it clear what is the scope of my current project and what could
>>> be left to future work.
>>> * It now reflects the current progress of my work, and discusses how it
>>> should work with the portable pipeline representation (WIP)
>>>
>>> Also, I forgot to mention last time that this doc may be interesting to
>>> those of you interested in Reshuffle, because Reshuffle is used as a
>>> current workaround for the problem described in the doc.
>>>
>>> More comments are always welcome.
>>>
>>> Best,
>>> Robin
>>>
>>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles  wrote:
>>>
 Thanks for the write up. It is great to see someone pushing this
 through.

 I wanted to bring Luke's high-level question back to the list for
 visibility: what about portability and other SDKs?

 Portability is probably trivial, but the "other SDKs" question is
 probably best answered by folks working on them who can have opinions about
 how it works in their SDKs idioms.

 Kenn
 ​

>>>


Re: Invite to comment on the @RequiresStableInput design doc

2018-07-02 Thread Reuven Lax
One issue: we definitely have some strong use cases where we want this on
ProcessTimer but not on ProcessElement. Since both are on the same DoFn,
I'm not sure how you would represent this as a separate transform.

On Mon, Jul 2, 2018 at 5:05 PM Robert Bradshaw  wrote:

> Thanks for the writeup.
>
> I'm wondering with, rather than phrasing this as an annotation on DoFn
> methods that gets plumbed down through the portability representation, if
> it would make more sense to introduce a new, primitive "EnsureStableInput"
> transform. For those runners whose reshuffle provide stable inputs, they
> could use that as an implementation, and other runners could provide other
> suitable implementations.
>
>
>
> On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:
>
>> Hi everyone,
>>
>> Thanks for your feedback on the doc. I have revamped it according to all
>> of the comments. The major changes I have made are:
>> * The problem description should be more general and accurate now.
>> * I added more background information, such as details about Reshuffle,
>> so I should be easier to understand now.
>> * I made it clear what is the scope of my current project and what could
>> be left to future work.
>> * It now reflects the current progress of my work, and discusses how it
>> should work with the portable pipeline representation (WIP)
>>
>> Also, I forgot to mention last time that this doc may be interesting to
>> those of you interested in Reshuffle, because Reshuffle is used as a
>> current workaround for the problem described in the doc.
>>
>> More comments are always welcome.
>>
>> Best,
>> Robin
>>
>> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles  wrote:
>>
>>> Thanks for the write up. It is great to see someone pushing this through.
>>>
>>> I wanted to bring Luke's high-level question back to the list for
>>> visibility: what about portability and other SDKs?
>>>
>>> Portability is probably trivial, but the "other SDKs" question is
>>> probably best answered by folks working on them who can have opinions about
>>> how it works in their SDKs idioms.
>>>
>>> Kenn
>>> ​
>>>
>>


Re: Invite to comment on the @RequiresStableInput design doc

2018-07-02 Thread Robert Bradshaw
Thanks for the writeup.

I'm wondering with, rather than phrasing this as an annotation on DoFn
methods that gets plumbed down through the portability representation, if
it would make more sense to introduce a new, primitive "EnsureStableInput"
transform. For those runners whose reshuffle provide stable inputs, they
could use that as an implementation, and other runners could provide other
suitable implementations.



On Mon, Jul 2, 2018 at 3:26 PM Robin Qiu  wrote:

> Hi everyone,
>
> Thanks for your feedback on the doc. I have revamped it according to all
> of the comments. The major changes I have made are:
> * The problem description should be more general and accurate now.
> * I added more background information, such as details about Reshuffle, so
> I should be easier to understand now.
> * I made it clear what is the scope of my current project and what could
> be left to future work.
> * It now reflects the current progress of my work, and discusses how it
> should work with the portable pipeline representation (WIP)
>
> Also, I forgot to mention last time that this doc may be interesting to
> those of you interested in Reshuffle, because Reshuffle is used as a
> current workaround for the problem described in the doc.
>
> More comments are always welcome.
>
> Best,
> Robin
>
> On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles  wrote:
>
>> Thanks for the write up. It is great to see someone pushing this through.
>>
>> I wanted to bring Luke's high-level question back to the list for
>> visibility: what about portability and other SDKs?
>>
>> Portability is probably trivial, but the "other SDKs" question is
>> probably best answered by folks working on them who can have opinions about
>> how it works in their SDKs idioms.
>>
>> Kenn
>> ​
>>
>


Re: Invite to comment on the @RequiresStableInput design doc

2018-07-02 Thread Robin Qiu
Hi everyone,

Thanks for your feedback on the doc. I have revamped it according to all of
the comments. The major changes I have made are:
* The problem description should be more general and accurate now.
* I added more background information, such as details about Reshuffle, so
I should be easier to understand now.
* I made it clear what is the scope of my current project and what could be
left to future work.
* It now reflects the current progress of my work, and discusses how it
should work with the portable pipeline representation (WIP)

Also, I forgot to mention last time that this doc may be interesting to
those of you interested in Reshuffle, because Reshuffle is used as a
current workaround for the problem described in the doc.

More comments are always welcome.

Best,
Robin

On Fri, Jun 15, 2018 at 7:34 AM Kenneth Knowles  wrote:

> Thanks for the write up. It is great to see someone pushing this through.
>
> I wanted to bring Luke's high-level question back to the list for
> visibility: what about portability and other SDKs?
>
> Portability is probably trivial, but the "other SDKs" question is probably
> best answered by folks working on them who can have opinions about how it
> works in their SDKs idioms.
>
> Kenn
> ​
>


Re: Invite to comment on the @RequiresStableInput design doc

2018-06-15 Thread Kenneth Knowles
Thanks for the write up. It is great to see someone pushing this through.

I wanted to bring Luke's high-level question back to the list for
visibility: what about portability and other SDKs?

Portability is probably trivial, but the "other SDKs" question is probably
best answered by folks working on them who can have opinions about how it
works in their SDKs idioms.

Kenn
​


Invite to comment on the @RequiresStableInput design doc

2018-06-14 Thread Robin Qiu
Hello everyone,

I am Robin Qiu. I joined Google and started working on Beam Java SDK 2
months ago.

As my starting project, I am working on supporting the @RequiresStableInput
annotation in runners. Here is a short design doc. Please take a look and
feel free to comment.
https://docs.google.com/document/d/117yRKbbcEdm3eIKB_26BHOJGmHSZl1YNoF0RqWGtqAM/edit?usp=sharing

You can also find the context of the problem in this email thread:
https://lists.apache.org/thread.html/ae3c838df060e47148439d1dad818d5e927b2a25ff00cc4153221dff@%3Cdev.beam.apache.org%3E


Best,
Robin