I would also suggest using Java's Instant since it will be compatible with many more date/time libraries without forcing onto users the need to go through an artificial millis/nanos conversion layer to Java's Instant.
On Tue, Oct 29, 2019 at 5:06 PM Robert Bradshaw <[email protected]> wrote: > On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles <[email protected]> wrote: > > > > Point (1) is compelling. Solutions to the "minus epsilon" seem a bit > complex. On the other hand, an opaque and abstract Timestamp type (in each > SDK) going forward seems like a Pretty Good Idea (tm). Would you really > have to go floating point? Could you just have a distinguished > representation for non-inclusive upper/lower bounds? These could be at the > same reduced resolution as timestamps in element metadata, since that is > all they are compared against. > > If I were coming up with an abstract, opaque representation of > Timestamp (and Duration) for Beam, I would explicitly include the > "minus epsilon" concept. One could still do arithmetic with these. > This would make any conversion to standard datetime libraries lossy > though. > > > Point (2) is also good, though it seems like something that could be > cleverly engineered and/or we just provide one implementation and it is > easy to make your own for finer granularity, since a WindowFn separately > receives the Timestamp (here I'm pretending it is abstract and opaque and > likely approximate) and the original element with whatever precision the > original data included. > > Yes, but I don't see how a generic WindowFn would reach into the > (arbitrary) element and pull out this original data. One of the > benefits of the Beam model is that the WindowFn does not have to > depend on the element type. > > > Point (3) the model/runner owns the timestamp metadata so I feel fine > about it being approximated as long as any original user data is still > present. I don't recall seeing a compelling case where the timestamp > metadata that the runner tracks and understands is required to be exactly > the same as a user value (assuming users understand this distinction, which > is another issue that I would separate from whether it is technically > feasible). > > As we provide the ability to designate user data as the runner > timestamp against which to window, and promote the runner timestamp > back to user data (people are going to want to get DateTime or Instant > objects out of it), it seems tricky to explain to users that one or > both of these operations may be lossy (and, in addition, I don't think > there's a consistently safe direction to round). > > > The more I think about the very real problems you point out, the more I > think that our backwards-incompatible move should be to our own abstract > Timestamp type, putting the design decision behind a minimal interface. If > we see a concrete design for that data type, we might be inspired how to > support more possibilities. > > > > As for the rest of the speculation... moving to nanos immediately helps > users so I am now +1 on just doing it, or moving ahead with an abstract > data type under the assumption that it will basically be nanos under the > hood. > > If the fact that it's stored as nanos under the hood leaks out (and I > have trouble seeing how it won't) I'd lean towards just using them > directly (e.g. Java Instant) rather than wrapping it. > > > Having a cleverly resolution-independent system is interesting and maybe > extremely future proof but maybe preparing for a very distant future that > may never come. > > > > Kenn > > > > On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <[email protected]> > wrote: > >> > >> TL;DR: We should just settle on nanosecond precision ubiquitously for > timestamp/windowing in Beam. > >> > >> > >> Re-visiting this discussion in light of cross-language transforms and > runners, and trying to tighten up testing. I've spent some more time > thinking about how we could make these operations granularity-agnostic, but > just can't find a good solution. In particular, the sticklers seem to be: > >> > >> (1) Windows are half-open intervals, and the timestamp associated with > a window coming out of a GBK is (by default) as large as possible but must > live in that window. (Otherwise WindowInto + GBK + WindowInto would have > the unforunate effect of moving aggregate values into subsequent windows, > which is clearly not the intent.) In other words, the timestamp of a > grouped value is basically End(Window) - epsilon. Unless we choose a > representation able to encode "minus epsilon" we must agree on a > granularity. > >> > >> (2) Unless we want to have multiple vairants of all our WindowFns (e.g. > FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a > granularity with which to parameterize these well-known operations. There > are cases (e.g. side input window mapping, merging) where these Fns may be > used downstream in contexts other than where they are applied/defined. > >> > >> (3) Reification of the timestamp into user-visible data, and the other > way around, require a choice of precision to expose to the user. This means > that the timestamp is actual data, and truncating/rounding cannot be done > implicitly. Also round trip of reification and application of timestamps > should hopefully be idempotent no matter the SDK. > >> > >> The closest I've come is possibly parameterizing the timestamp type, > where encoding, decoding (including pulling the end out of a window?), > comparison (against each other and a watermark), "minus epsilon", etc could > be UDFs. Possibly we'd need the full set of arithmetic operations to > implement FixedWindows on an unknown timestamp type. Reification would > simply be dis-allowed (or return an opaque rather than SDK-native) type if > the SDK did not know that window type. The fact that one might need > comparison between timestamps of different types, or (lossless) coercion > from one type to another, means that timestamp types need to know about > each other, or another entity needs to know about the full cross-product, > unless there is a common base-type (at which point we might as well always > choose that). > >> > >> An intermediate solution is to settle on floating (decimal) point > representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the > mapping through SDK-native types (which could require rounding or errors or > a new opaque type, and few date librarys could faithfully expose the minus > epsilon part). It might also be more expensive (compute and storage), and > would not allow us to use the protofuf timestamp/duration fields (or any > standard date/time libraries). > >> > >> Unless we can come up with a clean solution to the issues above > shortly, I think we should fix a precision and move forward. If this makes > sense to everyone, then we can start talking about the specific choice of > precision and a migration path (possibly only for portability). > >> > >> > >> For reference, the manipulations we do on timestamps are: > >> > >> WindowInto: Timestamp -> Window > >> TimestampCombine: Window, [Timestamp] -> Timestamp > >> End(Window) > >> Min(Timestamps) > >> Max(Timestamps) > >> PastEndOfWindow: Watermark, Window -> {True, False} > >> > >> [SideInput]WindowMappingFn: Window -> Window > >> WindowInto(End(Window)) > >> > >> GetTimestamp: Timestamp -> SDK Native Object > >> EmitAtTimestamp: SDK Native Object -> Timestamp > >> > >> > >> > >> > >> > >> > >> On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <[email protected]> > wrote: > >>> > >>> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <[email protected]> > wrote: > >>> > >>> > From: Robert Bradshaw <[email protected]> > >>> > Date: Wed, May 8, 2019 at 3:00 PM > >>> > To: dev > >>> > > >>> >> From: Kenneth Knowles <[email protected]> > >>> >> Date: Wed, May 8, 2019 at 6:50 PM > >>> >> To: dev > >>> >> > >>> >> >> The end-of-window, for firing, can be approximate, but it seems > it > >>> >> >> should be exact for timestamp assignment of the result (and > similarly > >>> >> >> with the other timestamp combiners). > >>> >> > > >>> >> > I was thinking that the window itself should be stored as exact > data, while just the firing itself is approximated, since it already is, > because of watermarks and timers. > >>> >> > >>> >> I think this works where we can compare encoded windows, but some > >>> >> portable interpretation of windows is required for runner-side > >>> >> implementation of merging windows (for example). > >>> > > >>> > But in this case, you've recognized the URN of the WindowFn anyhow, > so you understand its windows. Remembering that IntervalWindow is just one > choice, and that windows themselves are totally user-defined and that > merging logic is completely arbitrary per WindowFn (we probably should have > some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654). > So I file this use case in the "runner knows everything about the WindowFn > and Window type and window encoding anyhow". > >>> > >>> Being able to merge common windows in the runner is just an > >>> optimization, but an important one (especially for bootstrapping > >>> SDKs). However, this is not just about runner to SDK, but SDK to SDK > >>> as well (where a user from one SDK may want to inspect the windows > >>> produced by another). Having MillisIntervalWindow, > >>> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I > >>> think is worth going down. > >>> > >>> Yes, we need to solve the "extract the endpoint of an unknown encoded > >>> window" problem as well, possibly similar to what we do with length > >>> prefix coders, possibly a restriction on window encodings themselves. > >>> > >>> >> There may also be issues if windows (or timestamps) are assigned to > a > >>> >> high precision in one SDK, then inspected/acted on in another SDK, > and > >>> >> then passed back to the original SDK where the truncation would be > >>> >> visible. > >>> > > >>> > This is pretty interesting and complex. But again, a window is just > data. An SDK has to know how to deserialize it to operate on it. Unless we > do actually standardize some aspects of it. I don't believe BoundedWindow > encoding has a defined way to get the timestamp without decoding the > window, does it? I thought we had basically default to all InternalWindows. > But I am not following that closely. > >>> > > >>> >> > You raise a good point that min/max timestamp combiners require > actually understanding the higher-precision timestamp. I can think of a > couple things to do. One is the old "standardize all 3 or for precisions we > need" and the other is that combiners other than EOW exist primarily to > hold the watermark, and that hold does not require the original precision. > Still, neither of these is that satisfying. > >>> >> > >>> >> In the current model, the output timestamp is user-visible. > >>> > > >>> > But as long as the watermark hold is less, it is safe. It requires > knowing the coarse-precision lower bound of the timestamps of the input. > And there may be situations where you also want the coarse upper bound. But > you do know that these are at most one millisecond apart (assuming the > runner is in millis) so perhaps no storage overhead. But a lot of > complexity and chances for off by ones. And this is pretty hand-wavy. > >>> > >>> Yeah. A different SDK may (implicitly or explicitly) ask for the > >>> timestamp of the (transitive) output of a GBK, for which an > >>> approximation (either way) is undesirable. > >>> > >>> >> >> > A correction: Java *now* uses nanoseconds [1]. It uses the > same breakdown as proto (int64 seconds since epoch + int32 nanos within > second). It has legacy classes that use milliseconds, and Joda itself now > encourages moving back to Java's new Instant type. Nanoseconds should > complicate the arithmetic only for the one person authoring the date > library, which they have already done. > >>> >> >> > >>> >> >> The encoding and decoding need to be done in a > language-consistent way > >>> >> >> as well. > >>> >> > > >>> >> > I honestly am not sure what you mean by "language-consistent" > here. > >>> >> > >>> >> If we want to make reading and writing of timestamps, windows > >>> >> cross-language, we can't rely on language-specific libraries to do > the > >>> >> encoding. > >>> >> > >>> >> >> Also, most date libraries don't division, etc. operators, so > >>> >> >> we have to do that as well. Not that it should be *that* hard. > >>> >> > > >>> >> > If the libraries dedicated to time handling haven't found it > needful, is there a specific reason you raise this? We do some simple math > to find the window things fall into; is that it? > >>> >> > >>> >> Yes. E.g. > >>> >> > >>> >> > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77 > >>> >> > >>> >> would be a lot messier if there were no mapping date libraries to > raw > >>> >> ints that we can do arithmetic on. Writing this with the (seconds, > >>> >> nanos) representation is painful. But I suppose we'd only have to do > >>> >> it once per SDK. > >>> > > >>> > > >>> > Yea I think that arithmetic is not so bad. But this raises the issue > of writing a *generic* WindowFn where its idea of timestamp granularity > (the WindowFn owns the window type and encoding) may not match the user > data coming in. So you need to apply the approximation function to provide > type-correct input to the WindowFn. That's kind of exciting and weird and > perhaps unsolvable, except by choosing a concrete granularity. > >>> > > >>> > Kenn > >>> > > >>> >> > >>> >> > >>> >> >> >> It would also be really nice to clean up the infinite-future > being the > >>> >> >> >> somewhat arbitrary max micros rounded to millis, and > >>> >> >> >> end-of-global-window being infinite-future minus 1 hour > (IIRC), etc. > >>> >> >> >> as well as the ugly logic in Python to cope with millis-micros > >>> >> >> >> conversion. > >>> >> >> > > >>> >> >> > I actually don't have a problem with this. If you are trying > to keep the representation compact, not add bytes on top of instants, then > you just have to choose magic numbers, right? > >>> >> >> > >>> >> >> It's not about compactness, it's the (historically-derived?) > >>> >> >> arbitrariness of the numbers. > >>> >> > > >>> >> > What I mean is that the only reason to fit them into an integer > at all is compactness. Otherwise, you could use a proper disjoint union > representing your intent directly, and all fiddling goes away, like > `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. > It costs a couple of bits. > >>> >> > >>> >> The other cost is not being able to use standard libraries to > >>> >> represent all of your timestamps. > >>> >> > >>> >> >> For example, the bounds are chosen to > >>> >> >> fit within 64-bit mircos despite milliseconds being the "chosen" > >>> >> >> granularity, and care was taken that > >>> >> >> > >>> >> >> WindowInto(Global) | GBK | WindowInto(Minute) | GBK > >>> >> >> > >>> >> >> works, but > >>> >> >> > >>> >> >> WindowInto(Global) | GBK | WindowInto(Day) | GBK > >>> >> >> > >>> >> >> may produce elements with timestamps greater than MaxTimestamp. > >>> >> >> > >>> >> >> > > >>> >> >> > Kenn > >>> >> >> > > >>> >> >> > [1] > https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html > >>> >> >> > > >>> >> >> >> > >>> >> >> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke < > [email protected]> wrote: > >>> >> >> >> >> > >>> >> >> >> >> +1 for plan B. Nano second precision on windowing seems... > a little much for a system that's aggregating data over time. Even for > processing say particle super collider data, they'd get away with > artificially increasing the granularity in batch settings. > >>> >> >> >> >> > >>> >> >> >> >> Now if they were streaming... they'd probably want > femtoseconds anyway. > >>> >> >> >> >> The point is, we should see if users demand it before > adding in the necessary work. > >>> >> >> >> >> > >>> >> >> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath < > [email protected]> wrote: > >>> >> >> >> >>> > >>> >> >> >> >>> +1 for plan B as well. I think it's important to make > timestamp precision consistent now without introducing surprising behaviors > for existing users. But we should move towards a higher granularity > timestamp precision in the long run to support use-cases that Beam users > otherwise might miss out (on a runner that supports such precision). > >>> >> >> >> >>> > >>> >> >> >> >>> - Cham > >>> >> >> >> >>> > >>> >> >> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik < > [email protected]> wrote: > >>> >> >> >> >>>> > >>> >> >> >> >>>> I also like Plan B because in the cross language case, > the pipeline would not work since every party (Runners & SDKs) would have > to be aware of the new beam:coder:windowed_value:v2 coder. Plan A has the > property where if the SDK/Runner wasn't updated then it may start > truncating the timestamps unexpectedly. > >>> >> >> >> >>>> > >>> >> >> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik < > [email protected]> wrote: > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> Kenn, this discussion is about the precision of the > timestamp in the user data. As you had mentioned, Runners need not have the > same granularity of user data as long as they correctly round the timestamp > to guarantee that triggers are executed correctly but the user data should > have the same precision across SDKs otherwise user data timestamps will be > truncated in cross language scenarios. > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> Based on the systems that were listed, either > microsecond or nanosecond would make sense. The issue with changing the > precision is that all Beam runners except for possibly Beam Python on > Dataflow are using millisecond precision since they are all using the same > Java Runner windowing/trigger logic. > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> Plan A: Swap precision to nanosecond > >>> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond > precision timestamps (do now) > >>> >> >> >> >>>>> 2) Change the user data encoding to support nanosecond > precision (do now) > >>> >> >> >> >>>>> 3) Swap runner libraries to be nanosecond precision > aware updating all window/triggering logic (do later) > >>> >> >> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later) > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> Plan B: > >>> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond > precision timestamps and keep the data encoding as is (do now) > >>> >> >> >> >>>>> (We could add greater precision later to plan B by > creating a new version beam:coder:windowed_value:v2 which would be > nanosecond and would require runners to correctly perform an internal > conversions for windowing/triggering.) > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> I think we should go with Plan B and when users request > greater precision we can make that an explicit effort. What do people think? > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> > >>> >> >> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels < > [email protected]> wrote: > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> Hi, > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> Thanks for taking care of this issue in the Python > SDK, Thomas! > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> It would be nice to have a uniform precision for > timestamps but, as Kenn > >>> >> >> >> >>>>>> pointed out, timestamps are extracted from systems > that have different > >>> >> >> >> >>>>>> precision. > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> To add to the list: Flink - milliseconds > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> After all, it doesn't matter as long as there is > sufficient precision > >>> >> >> >> >>>>>> and conversions are done correctly. > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> I think we could improve the situation by at least > adding a > >>> >> >> >> >>>>>> "milliseconds" constructor to the Python SDK's > Timestamp. > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> Cheers, > >>> >> >> >> >>>>>> Max > >>> >> >> >> >>>>>> > >>> >> >> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote: > >>> >> >> >> >>>>>> > I am not so sure this is a good idea. Here are some > systems and their > >>> >> >> >> >>>>>> > precision: > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > Arrow - microseconds > >>> >> >> >> >>>>>> > BigQuery - microseconds > >>> >> >> >> >>>>>> > New Java instant - nanoseconds > >>> >> >> >> >>>>>> > Firestore - microseconds > >>> >> >> >> >>>>>> > Protobuf - nanoseconds > >>> >> >> >> >>>>>> > Dataflow backend - microseconds > >>> >> >> >> >>>>>> > Postgresql - microseconds > >>> >> >> >> >>>>>> > Pubsub publish time - nanoseconds > >>> >> >> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime > about 3 millis) > >>> >> >> >> >>>>>> > Cassandra - milliseconds > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > IMO it is important to be able to treat any of these > as a Beam > >>> >> >> >> >>>>>> > timestamp, even though they aren't all streaming. > Who knows when we > >>> >> >> >> >>>>>> > might be ingesting a streamed changelog, or using > them for reprocessing > >>> >> >> >> >>>>>> > an archived stream. I think for this purpose we > either should > >>> >> >> >> >>>>>> > standardize on nanoseconds or make the runner's > resolution independent > >>> >> >> >> >>>>>> > of the data representation. > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > I've had some offline conversations about this. I > think we can have > >>> >> >> >> >>>>>> > higher-than-runner precision in the user data, and > allow WindowFns and > >>> >> >> >> >>>>>> > DoFns to operate on this higher-than-runner > precision data, and still > >>> >> >> >> >>>>>> > have consistent watermark treatment. Watermarks are > just bounds, after all. > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > Kenn > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise < > [email protected] > >>> >> >> >> >>>>>> > <mailto:[email protected]>> wrote: > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > The Python SDK currently uses timestamps in > microsecond resolution > >>> >> >> >> >>>>>> > while Java SDK, as most would probably expect, > uses milliseconds. > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > This causes a few difficulties with portability > (Python coders need > >>> >> >> >> >>>>>> > to convert to millis for WindowedValue and > Timers, which is related > >>> >> >> >> >>>>>> > to a bug I'm looking into: > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-7035 > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > As Luke pointed out, the issue was previously > discussed: > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-1524 > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > I'm not privy to the reasons why we decided to > go with micros in > >>> >> >> >> >>>>>> > first place, but would it be too big of a change > or impractical for > >>> >> >> >> >>>>>> > other reasons to switch Python SDK to millis > before it gets more users? > >>> >> >> >> >>>>>> > > >>> >> >> >> >>>>>> > Thanks, > >>> >> >> >> >>>>>> > Thomas > >>> >> >> >> >>>>>> > >
