Re: Definition of Unified model

2019-05-30 Thread Jan Lukavský
That's right, but is there a filesystem, that allows unbounded size of files? If there will always be an upper size limit, does that mean that you cannot use the order of elements in the file as is? You might need to transfer the offset from one file to another (that's how Kafka does it), but t

Re: Definition of Unified model

2019-05-30 Thread Reuven Lax
Files can grow (depending on the filesystem), and tailing growing files is a valid use case. On Wed, May 29, 2019 at 3:23 PM Jan Lukavský wrote: > > Offsets within a file, unordered between files seems exactly > analogous with offsets within a partition, unordered between partitions, > right? >

Re: Definition of Unified model

2019-05-29 Thread Jan Lukavský
> Offsets within a file, unordered between files seems exactly analogous with offsets within a partition, unordered between partitions, right? Not exactly. The key difference is in that partitions in streaming stores are defined (on purpose, and with key impact on this discussion) as unbounde

Re: Definition of Unified model

2019-05-29 Thread Lukasz Cwik
ably about the most consequential >>> design decision in the whole Beam model, around the same level as the >>> decision to use ParDo and GBK as the primitives IMO. >>> >> >>> >> Kenn >>> >> >>> >> On Thu, May 23, 2019

Re: Definition of Unified model

2019-05-29 Thread Robert Bradshaw
ot really. I'm suggesting that some variant of FIFO ordering is >> >>> necessary, which requires either runners natively support FIFO ordering >> >>> or transforms adding some extra sequence number to each record to sort >> >>> by. >> >&

Re: Definition of Unified model

2019-05-29 Thread Robert Bradshaw
On Tue, May 28, 2019 at 12:18 PM Jan Lukavský wrote: > > As I understood it, Kenn was supporting the idea that sequence metadata > is preferable over FIFO. I was trying to point out, that it even should > provide the same functionally as FIFO, plus one important more - > reproducibility and abilit

Re: Definition of Unified model

2019-05-28 Thread Reuven Lax
19 at 10:17 AM Reuven Lax wrote: >> >>> Not really. I'm suggesting that some variant of FIFO ordering is >> necessary, which requires either runners natively support FIFO ordering or >> transforms adding some extra sequence number to each record to sort by. >> >>&

Re: Definition of Unified model

2019-05-28 Thread Jan Lukavský
>>>> Hi, >>>> yes. It seems that ordering by user supplied UDF makes sense and I will update the design proposal accordingly. >>>> Would that solve the issues you mention? >>>> Jan >>>> -- Původní e-mail --

Re: Definition of Unified model

2019-05-28 Thread Reuven Lax
tra sequence number to each record to sort by. > >>> > >>> I still think your proposal is very useful by the way. I'm merely > pointing out that to solve the state-machine problem we probably need > something more. > >>> > >>> Reuven >

Re: Definition of Unified model

2019-05-28 Thread Jan Lukavský
r supplied UDF makes sense and I will update the design proposal accordingly. Would that solve the issues you mention? Jan -- Původní e-mail -- Od: Reuven Lax Komu: dev Datum: 23. 5. 2019 18:44:38 Předmět: Re: Definition of Unified model I'm simply saying that timestamp ordering is in

Re: Definition of Unified model

2019-05-28 Thread Robert Bradshaw
user supplied UDF makes sense and I will >>> update the design proposal accordingly. >>> Would that solve the issues you mention? >>> Jan >>> -- Původní e-mail -- >>> Od: Reuven Lax >>> Komu: dev >>> Datum: 23. 5. 201

Re: Definition of Unified model

2019-05-24 Thread Kenneth Knowles
nse and I will >> update the design proposal accordingly. >> Would that solve the issues you mention? >> Jan >> ---------- Původní e-mail -- >> Od: Reuven Lax >> Komu: dev >> Datum: 23. 5. 2019 18:44:38 >> Předmět: Re: Definition of Unified m

Re: Definition of Unified model

2019-05-23 Thread Reuven Lax
ues you mention? > Jan > -- Původní e-mail -- > Od: Reuven Lax > Komu: dev > Datum: 23. 5. 2019 18:44:38 > Předmět: Re: Definition of Unified model > > I'm simply saying that timestamp ordering is insufficient for state > machines. I wasn't prop

Re: Definition of Unified model

2019-05-23 Thread Jan Lukavský
il -- Od: Reuven Lax mailto:re...@google.com)> Komu: dev mailto:dev@beam.apache.org)> Datum: 23. 5. 2019 17:39:12 Předmět: Re: Definition of Unified model " So an example would be elements of type "startUserSession" and "endUserSession" (website sessions, not Beam se

Re: Definition of Unified model

2019-05-23 Thread Reuven Lax
dní e-mail -- > Od: Reuven Lax > Komu: dev > Datum: 23. 5. 2019 17:39:12 > Předmět: Re: Definition of Unified model > > So an example would be elements of type "startUserSession" and > "endUserSession" (website sessions, not Beam sessions). Logicall

Re: Definition of Unified model

2019-05-23 Thread Jan Lukavský
:12 Předmět: Re: Definition of Unified model " So an example would be elements of type "startUserSession" and "endUserSession" (website sessions, not Beam sessions). Logically you may need to process them in the correct order if you have any sort of state- machine lo

Re: Definition of Unified model

2019-05-23 Thread Reuven Lax
So an example would be elements of type "startUserSession" and "endUserSession" (website sessions, not Beam sessions). Logically you may need to process them in the correct order if you have any sort of state-machine logic. However timestamp ordering is never guaranteed to match the logical orderin

Re: Definition of Unified model

2019-05-23 Thread Robert Bradshaw
Good point. The "implementation-specific" way I would do this is window-by-instant, followed by a DoFn that gets all the elements with the same timestamp and sorts/acts accordingly, but this counts on the runner producing windows in timestamp order (likely?) and also the subsequent DoFn getting th

Re: Definition of Unified model

2019-05-23 Thread Reuven Lax
So Jan's example of state machines is quite a valid use case for ordering. However in my experience, timestamp ordering is insufficient for state machines. Elements that cause state transitions might come in with the exact same timestamp, yet still have a necessary ordering. Especially given Beam's

Re: Definition of Unified model

2019-05-23 Thread Jan Lukavský
Hi all, thanks everyone for this discussion. I think I have gathered enough feedback to be able to put down a proposition for changes, which I will do and send to this list for further discussion. There are still doubts remaining the non-determinism and it's relation to outputs stability vs.

Re: Definition of Unified model

2019-05-22 Thread Maximilian Michels
Someone from Flink might correct me if I'm wrong, but that's my current understanding. In essence your description of how exactly-once works in Flink is correct. The general assumption in Flink is that pipelines must be deterministic and thus produce idempotent writes in the case of failures.

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Kenneth Knowles
On Tue, May 21, 2019 at 7:02 AM Robert Bradshaw wrote: > Reza: One could provide something like this as a utility class, but > one downside is that it is not scale invariant. It requires a tuning > parameter that, if to small, won't mitigate the problem, but if to > big, greatly increases latency

Re: Definition of Unified model

2019-05-21 Thread Reuven Lax
I don't think this is completely accurate w.r.t Flink sinks. Flink provides a way for sinks to buffer data until a snapshot has been performed, at which point the data going to the sink is persistent. This has the exact same effect as other runners (e.g. Dataflow) that persistently buffer data. Bea

Re: Definition of Unified model

2019-05-21 Thread Lukasz Cwik
On Tue, May 21, 2019 at 7:49 AM Jan Lukavský wrote: > Hi, > > > Actually, I think it is a larger (open) question whether exactly once > is guaranteed by the model or whether runners are allowed to relax that. > I would think, however, that sources correctly implemented should be > idempotent whe

Re: Definition of Unified model

2019-05-21 Thread Jan Lukavský
Hi, > Actually, I think it is a larger (open) question whether exactly once is guaranteed by the model or whether runners are allowed to relax that. I would think, however, that sources correctly implemented should be idempotent when run atop an exactly once infrastructure such as Flink of Da

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Robert Bradshaw
Reza: One could provide something like this as a utility class, but one downside is that it is not scale invariant. It requires a tuning parameter that, if to small, won't mitigate the problem, but if to big, greatly increases latency. (Possibly one could define a dynamic session-like window to sol

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Reza Rokni
Hi Jan, It's a super interesting use case you have and has a lot of similarity with complexity that comes up when dealing with time series problems. I wonder if it would be interesting to see if the pattern generalises enough to make some utility classes abstracting the complexity from the user.

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Reza, I think it probably would provide enough compression. But it would introduce complications and latency for the streaming case. Although I see your point, I was trying to figure out if the Beam model should support these use cases more "natively". Cheers,  Jan On 5/21/19 11:03 AM,

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Robert, > Beam has an exactly-once model. If the data was consumed, state mutated, and outputs written downstream (these three are committed together atomically) it will not be replayed. That does not, of course, solve the non-determanism due to ordering (including the fact that two operat

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Robert Bradshaw
On Mon, May 20, 2019 at 5:24 PM Jan Lukavský wrote: > > > I don't see batch vs. streaming as part of the model. One can have > microbatch, or even a runner that alternates between different modes. > > Although I understand motivation of this statement, this project name is > "Apache Beam: An adva

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Reza Rokni
In a lot of cases the initial combiner can dramatically reduce the amount of data in this last phase making it tractable for a lot of use cases. I assume in your example the first phase would not provide enough compression? Cheers Reza On Tue, 21 May 2019, 16:47 Jan Lukavský, wrote: > Hi Rez

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Reza, thanks for reaction, comments inline. On 5/21/19 1:02 AM, Reza Rokni wrote: Hi, If I have understood the use case correctly, your output is an ordered counter of state changes. One approach  which might be worth exploring is outlined below, haven't had a chance to test it so could

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-21 Thread Jan Lukavský
Hi Kenn, OK, so if we introduce annotation, we can have stateful ParDo with sorting, that would perfectly resolve my issues. I still have some doubts, though. Let me explain. The current behavior of stateful ParDo has the following properties:  a) might fail in batch, although runs fine in s

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Reza Rokni
Hi, If I have understood the use case correctly, your output is an ordered counter of state changes. One approach which might be worth exploring is outlined below, haven't had a chance to test it so could be missing pieces or be plane old wrong ( will try and come up with a test example later on

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Kenneth Knowles
Thanks for the nice small example of a calculation that depends on order. You are right that many state machines have this property. I agree w/ you and Luke that it is convenient for batch processing to sort by event timestamp before running a stateful ParDo. In streaming you could also implement "

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Yes, the problem will arise probably mostly when you have not well distributed keys (or too few keys). I'm really not sure if a pure GBK with a trigger can solve this - it might help to have data driven trigger. There would still be some doubts, though. The main question is still here - people

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Lukasz Cwik
It is read all per key and window and not just read all (this still won't scale with hot keys in the global window). The GBK preceding the StatefulParDo will guarantee that you are processing all the values for a specific key and window at any given time. Is there a specific window/trigger that is

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Hi Lukasz, > Today, if you must have a strict order, you must guarantee that your StatefulParDo implements the necessary "buffering & sorting" into state. Yes, no problem with that. But this whole discussion started, because *this doesn't work on batch*. You simply cannot first read everythin

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Lukasz Cwik
On Mon, May 20, 2019 at 8:24 AM Jan Lukavský wrote: > This discussion brings many really interesting questions for me. :-) > > > I don't see batch vs. streaming as part of the model. One can have > microbatch, or even a runner that alternates between different modes. > > Although I understand mo

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
This discussion brings many really interesting questions for me. :-) > I don't see batch vs. streaming as part of the model. One can have microbatch, or even a runner that alternates between different modes. Although I understand motivation of this statement, this project name is "Apache Beam

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Robert Bradshaw
On Mon, May 20, 2019 at 1:19 PM Jan Lukavský wrote: > > Hi Robert, > > yes, I think you rephrased my point - although no *explicit* guarantees > of ordering are given in either mode, there is *implicit* ordering in > streaming case that is due to nature of the processing - the difference > between

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
On 5/20/19 1:39 PM, Reuven Lax wrote: On Mon, May 20, 2019 at 4:19 AM Jan Lukavský > wrote: Hi Robert, yes, I think you rephrased my point - although no *explicit* guarantees of ordering are given in either mode, there is *implicit* ordering in str

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Reuven Lax
On Mon, May 20, 2019 at 4:19 AM Jan Lukavský wrote: > Hi Robert, > > yes, I think you rephrased my point - although no *explicit* guarantees > of ordering are given in either mode, there is *implicit* ordering in > streaming case that is due to nature of the processing - the difference > between

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Jan Lukavský
Hi Robert, yes, I think you rephrased my point - although no *explicit* guarantees of ordering are given in either mode, there is *implicit* ordering in streaming case that is due to nature of the processing - the difference between watermark and timestamp of elements flowing through the pipel

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-20 Thread Robert Bradshaw
On Fri, May 17, 2019 at 4:48 PM Jan Lukavský wrote: > > Hi Reuven, > > > How so? AFAIK stateful DoFns work just fine in batch runners. > > Stateful ParDo works in batch as far, as the logic inside the state works for > absolutely unbounded out-of-orderness of elements. That basically > (practica

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jan Lukavský
@apache.org>> Komu: dev@beam.apache.org <mailto:dev@beam.apache.org> Datum: 16. 5. 2019 15:59:59 Předmět: Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded) Hi Jan, Thanks for the discussion. Aljoscha alrea

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Reuven Lax
src/main/java/org/apache/beam/runners/dataflow/BatchStatefulParDoOverrides.java#L64 > > > On Thu, May 16, 2019 at 5:09 PM Jan Lukavský wrote: > >> Hi Max, >> answers inline. >> -- Původní e-mail -- >> Od: Maximilian Michels >> Komu: dev@beam.ap

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Reuven Lax
*From: *Jan Lukavský *Date: *Thu, May 16, 2019 at 8:09 AM *To: * Hi Max, > answers inline. > -- Původní e-mail -- > Od: Maximilian Michels > Komu: dev@beam.apache.org > Datum: 16. 5. 2019 15:59:59 > Předmět: Re: Definition of Unified model

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jan Lukavský
e.org Datum: 16. 5. 2019 15:59:59 Předmět: Re: Definition of Unified model (WAS: Semantics of PCollection. isBounded) "Hi Jan, Thanks for the discussion. Aljoscha already gave great answers. Just a couple of remarks: > a) streaming semantics (i.e. what I can express using Transforms) are

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-17 Thread Jozef Vilcek
ail -- > Od: Maximilian Michels > Komu: dev@beam.apache.org > Datum: 16. 5. 2019 15:59:59 > Předmět: Re: Definition of Unified model (WAS: Semantics of > PCollection.isBounded) > > Hi Jan, > > Thanks for the discussion. Aljoscha already gave great answers. Just a >

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
Hi Max, answers inline. -- Původní e-mail -- Od: Maximilian Michels Komu: dev@beam.apache.org Datum: 16. 5. 2019 15:59:59 Předmět: Re: Definition of Unified model (WAS: Semantics of PCollection. isBounded) "Hi Jan, Thanks for the discussion. Aljoscha already gave great an

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
y understanding of the Beam model. :-O Best, Aljoscha On 16. May 2019, at 13:53, Jan Lukavský wrote: Hi, this is starting to be really exciting. It seems to me that there is either something wrong with my definition of "Unified model" or with how it is implemented inside (at least)

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Maximilian Michels
definition of "Unified model" or with how it is implemented inside (at least) Direct and Flink Runners. So, first what I see as properties of Unified model: a) streaming semantics (i.e. what I can express using Transforms) are subset of batch semantics - this is true, batch semantics and

Re: Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Aljoscha Krettek
head and I might be wrong in my understanding of the Beam model. :-O Best, Aljoscha > On 16. May 2019, at 13:53, Jan Lukavský wrote: > > Hi, > > this is starting to be really exciting. It seems to me that there is either > something wrong with my definition of "Unified m

Definition of Unified model (WAS: Semantics of PCollection.isBounded)

2019-05-16 Thread Jan Lukavský
Hi, this is starting to be really exciting. It seems to me that there is either something wrong with my definition of "Unified model" or with how it is implemented inside (at least) Direct and Flink Runners. So, first what I see as properties of Unified model:  a) streaming sema