This got pretty long, but I don't yet want to end it, because there's not quite yet a solution that will allow a user to treat timestamps from most systems as Beam timestamps.
I'm cutting pieces just to make inline replies easier to read. On Tue, Apr 23, 2019 at 9:03 AM Robert Bradshaw <[email protected]> wrote: > On Tue, Apr 23, 2019 at 4:20 PM Kenneth Knowles <[email protected]> wrote: > > - WindowFn must receive exactly the data that came from the user's > data source. So that cannot be rounded. > > - The user's WindowFn assigns to a window, so it can contain arbitrary > precision as it should be grouped as bytes. > > - End of window, timers, watermark holds, etc, are all treated only as > bounds, so can all be rounded based on their use as an upper or lower bound. > > > > We already do a lot of this - Pubsub publish timestamps are microsecond > precision (you could say our current connector constitutes data loss) as > are Windmill timestamps (since these are only combines of Beam timestamps > here there is no data loss). There are undoubtedly some corner cases I've > missed, and naively this might look like duplicating timestamps so that > could be an unacceptable performance concern. > > If I understand correctly, in this scheme WindowInto assignment is > paramaterized by a function that specifies how to parse/extract the > timestamp from the data element (maybe just a field specifier for > schema'd data) rather than store the (exact) timestamp in a standard > place in the WindowedValue, and the window merging always goes back to > the SDK rather than the possibility of it being handled runner-side. > This sounds promising. You could also store the extracted approximate timestamp somewhere, of course. Even if the runner doesn't care about interpreting the window, I think > we'll want to have compatible window representations (and timestamp > representations, and windowing fns) across SDKs (especially for > cross-language) which favors choosing a consistent resolution. The end-of-window, for firing, can be approximate, but it seems it > should be exact for timestamp assignment of the result (and similarly > with the other timestamp combiners). > I was thinking that the window itself should be stored as exact data, while just the firing itself is approximated, since it already is, because of watermarks and timers. You raise a good point that min/max timestamp combiners require actually understanding the higher-precision timestamp. I can think of a couple things to do. One is the old "standardize all 3 or for precisions we need" and the other is that combiners other than EOW exist primarily to hold the watermark, and that hold does not require the original precision. Still, neither of these is that satisfying. > > A correction: Java *now* uses nanoseconds [1]. It uses the same > breakdown as proto (int64 seconds since epoch + int32 nanos within second). > It has legacy classes that use milliseconds, and Joda itself now encourages > moving back to Java's new Instant type. Nanoseconds should complicate the > arithmetic only for the one person authoring the date library, which they > have already done. > > The encoding and decoding need to be done in a language-consistent way > as well. I honestly am not sure what you mean by "language-consistent" here. Also, most date libraries don't division, etc. operators, so > we have to do that as well. Not that it should be *that* hard. If the libraries dedicated to time handling haven't found it needful, is there a specific reason you raise this? We do some simple math to find the window things fall into; is that it? > >> It would also be really nice to clean up the infinite-future being the > >> somewhat arbitrary max micros rounded to millis, and > >> end-of-global-window being infinite-future minus 1 hour (IIRC), etc. > >> as well as the ugly logic in Python to cope with millis-micros > >> conversion. > > > > I actually don't have a problem with this. If you are trying to keep the > representation compact, not add bytes on top of instants, then you just > have to choose magic numbers, right? > > It's not about compactness, it's the (historically-derived?) > arbitrariness of the numbers. What I mean is that the only reason to fit them into an integer at all is compactness. Otherwise, you could use a proper disjoint union representing your intent directly, and all fiddling goes away, like `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of bits. Kenn For example, the bounds are chosen to > fit within 64-bit mircos despite milliseconds being the "chosen" > granularity, and care was taken that > > WindowInto(Global) | GBK | WindowInto(Minute) | GBK > > works, but > > WindowInto(Global) | GBK | WindowInto(Day) | GBK > > may produce elements with timestamps greater than MaxTimestamp. > > > > > Kenn > > > > [1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html > > > >> > >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <[email protected]> > wrote: > >> >> > >> >> +1 for plan B. Nano second precision on windowing seems... a little > much for a system that's aggregating data over time. Even for processing > say particle super collider data, they'd get away with artificially > increasing the granularity in batch settings. > >> >> > >> >> Now if they were streaming... they'd probably want femtoseconds > anyway. > >> >> The point is, we should see if users demand it before adding in the > necessary work. > >> >> > >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath < > [email protected]> wrote: > >> >>> > >> >>> +1 for plan B as well. I think it's important to make timestamp > precision consistent now without introducing surprising behaviors for > existing users. But we should move towards a higher granularity timestamp > precision in the long run to support use-cases that Beam users otherwise > might miss out (on a runner that supports such precision). > >> >>> > >> >>> - Cham > >> >>> > >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <[email protected]> > wrote: > >> >>>> > >> >>>> I also like Plan B because in the cross language case, the > pipeline would not work since every party (Runners & SDKs) would have to be > aware of the new beam:coder:windowed_value:v2 coder. Plan A has the > property where if the SDK/Runner wasn't updated then it may start > truncating the timestamps unexpectedly. > >> >>>> > >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <[email protected]> > wrote: > >> >>>>> > >> >>>>> Kenn, this discussion is about the precision of the timestamp in > the user data. As you had mentioned, Runners need not have the same > granularity of user data as long as they correctly round the timestamp to > guarantee that triggers are executed correctly but the user data should > have the same precision across SDKs otherwise user data timestamps will be > truncated in cross language scenarios. > >> >>>>> > >> >>>>> Based on the systems that were listed, either microsecond or > nanosecond would make sense. The issue with changing the precision is that > all Beam runners except for possibly Beam Python on Dataflow are using > millisecond precision since they are all using the same Java Runner > windowing/trigger logic. > >> >>>>> > >> >>>>> Plan A: Swap precision to nanosecond > >> >>>>> 1) Change the Python SDK to only expose millisecond precision > timestamps (do now) > >> >>>>> 2) Change the user data encoding to support nanosecond precision > (do now) > >> >>>>> 3) Swap runner libraries to be nanosecond precision aware > updating all window/triggering logic (do later) > >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later) > >> >>>>> > >> >>>>> Plan B: > >> >>>>> 1) Change the Python SDK to only expose millisecond precision > timestamps and keep the data encoding as is (do now) > >> >>>>> (We could add greater precision later to plan B by creating a new > version beam:coder:windowed_value:v2 which would be nanosecond and would > require runners to correctly perform an internal conversions for > windowing/triggering.) > >> >>>>> > >> >>>>> I think we should go with Plan B and when users request greater > precision we can make that an explicit effort. What do people think? > >> >>>>> > >> >>>>> > >> >>>>> > >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels < > [email protected]> wrote: > >> >>>>>> > >> >>>>>> Hi, > >> >>>>>> > >> >>>>>> Thanks for taking care of this issue in the Python SDK, Thomas! > >> >>>>>> > >> >>>>>> It would be nice to have a uniform precision for timestamps but, > as Kenn > >> >>>>>> pointed out, timestamps are extracted from systems that have > different > >> >>>>>> precision. > >> >>>>>> > >> >>>>>> To add to the list: Flink - milliseconds > >> >>>>>> > >> >>>>>> After all, it doesn't matter as long as there is sufficient > precision > >> >>>>>> and conversions are done correctly. > >> >>>>>> > >> >>>>>> I think we could improve the situation by at least adding a > >> >>>>>> "milliseconds" constructor to the Python SDK's Timestamp. > >> >>>>>> > >> >>>>>> Cheers, > >> >>>>>> Max > >> >>>>>> > >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote: > >> >>>>>> > I am not so sure this is a good idea. Here are some systems > and their > >> >>>>>> > precision: > >> >>>>>> > > >> >>>>>> > Arrow - microseconds > >> >>>>>> > BigQuery - microseconds > >> >>>>>> > New Java instant - nanoseconds > >> >>>>>> > Firestore - microseconds > >> >>>>>> > Protobuf - nanoseconds > >> >>>>>> > Dataflow backend - microseconds > >> >>>>>> > Postgresql - microseconds > >> >>>>>> > Pubsub publish time - nanoseconds > >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime about 3 > millis) > >> >>>>>> > Cassandra - milliseconds > >> >>>>>> > > >> >>>>>> > IMO it is important to be able to treat any of these as a Beam > >> >>>>>> > timestamp, even though they aren't all streaming. Who knows > when we > >> >>>>>> > might be ingesting a streamed changelog, or using them for > reprocessing > >> >>>>>> > an archived stream. I think for this purpose we either should > >> >>>>>> > standardize on nanoseconds or make the runner's resolution > independent > >> >>>>>> > of the data representation. > >> >>>>>> > > >> >>>>>> > I've had some offline conversations about this. I think we can > have > >> >>>>>> > higher-than-runner precision in the user data, and allow > WindowFns and > >> >>>>>> > DoFns to operate on this higher-than-runner precision data, > and still > >> >>>>>> > have consistent watermark treatment. Watermarks are just > bounds, after all. > >> >>>>>> > > >> >>>>>> > Kenn > >> >>>>>> > > >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <[email protected] > >> >>>>>> > <mailto:[email protected]>> wrote: > >> >>>>>> > > >> >>>>>> > The Python SDK currently uses timestamps in microsecond > resolution > >> >>>>>> > while Java SDK, as most would probably expect, uses > milliseconds. > >> >>>>>> > > >> >>>>>> > This causes a few difficulties with portability (Python > coders need > >> >>>>>> > to convert to millis for WindowedValue and Timers, which > is related > >> >>>>>> > to a bug I'm looking into: > >> >>>>>> > > >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-7035 > >> >>>>>> > > >> >>>>>> > As Luke pointed out, the issue was previously discussed: > >> >>>>>> > > >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-1524 > >> >>>>>> > > >> >>>>>> > I'm not privy to the reasons why we decided to go with > micros in > >> >>>>>> > first place, but would it be too big of a change or > impractical for > >> >>>>>> > other reasons to switch Python SDK to millis before it > gets more users? > >> >>>>>> > > >> >>>>>> > Thanks, > >> >>>>>> > Thomas > >> >>>>>> > >
