Robert it seems like your for Plan A. Assuming we go forward with nanosecond and based upon your analysis in 3), wouldn't that mean we would have to make a breaking change to the Java SDK to swap to nanosecond precision?
On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <[email protected]> wrote: > TL;DR: We should just settle on nanosecond precision ubiquitously for > timestamp/windowing in Beam. > > > Re-visiting this discussion in light of cross-language transforms and > runners, and trying to tighten up testing. I've spent some more time > thinking about how we could make these operations granularity-agnostic, but > just can't find a good solution. In particular, the sticklers seem to be: > > (1) Windows are half-open intervals, and the timestamp associated with a > window coming out of a GBK is (by default) as large as possible but must > live in that window. (Otherwise WindowInto + GBK + WindowInto would have > the unforunate effect of moving aggregate values into subsequent windows, > which is clearly not the intent.) In other words, the timestamp of a > grouped value is basically End(Window) - epsilon. Unless we choose a > representation able to encode "minus epsilon" we must agree on a > granularity. > > (2) Unless we want to have multiple vairants of all our WindowFns (e.g. > FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a > granularity with which to parameterize these well-known operations. There > are cases (e.g. side input window mapping, merging) where these Fns may be > used downstream in contexts other than where they are applied/defined. > > (3) Reification of the timestamp into user-visible data, and the other way > around, require a choice of precision to expose to the user. This means > that the timestamp is actual data, and truncating/rounding cannot be done > implicitly. Also round trip of reification and application of timestamps > should hopefully be idempotent no matter the SDK. > > The closest I've come is possibly parameterizing the timestamp type, where > encoding, decoding (including pulling the end out of a window?), comparison > (against each other and a watermark), "minus epsilon", etc could be UDFs. > Possibly we'd need the full set of arithmetic operations to implement > FixedWindows on an unknown timestamp type. Reification would simply be > dis-allowed (or return an opaque rather than SDK-native) type if the SDK > did not know that window type. The fact that one might need comparison > between timestamps of different types, or (lossless) coercion from one type > to another, means that timestamp types need to know about each other, or > another entity needs to know about the full cross-product, unless there is > a common base-type (at which point we might as well always choose that). > > An intermediate solution is to settle on floating (decimal) point > representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the > mapping through SDK-native types (which could require rounding or errors or > a new opaque type, and few date librarys could faithfully expose the minus > epsilon part). It might also be more expensive (compute and storage), and > would not allow us to use the protofuf timestamp/duration fields (or any > standard date/time libraries). > > Unless we can come up with a clean solution to the issues above shortly, I > think we should fix a precision and move forward. If this makes sense to > everyone, then we can start talking about the specific choice of precision > and a migration path (possibly only for portability). > > > For reference, the manipulations we do on timestamps are: > > WindowInto: Timestamp -> Window > TimestampCombine: Window, [Timestamp] -> Timestamp > End(Window) > Min(Timestamps) > Max(Timestamps) > PastEndOfWindow: Watermark, Window -> {True, False} > > [SideInput]WindowMappingFn: Window -> Window > WindowInto(End(Window)) > > GetTimestamp: Timestamp -> SDK Native Object > EmitAtTimestamp: SDK Native Object -> Timestamp > > > > > > > On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <[email protected]> > wrote: > >> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <[email protected]> >> wrote: >> >> > From: Robert Bradshaw <[email protected]> >> > Date: Wed, May 8, 2019 at 3:00 PM >> > To: dev >> > >> >> From: Kenneth Knowles <[email protected]> >> >> Date: Wed, May 8, 2019 at 6:50 PM >> >> To: dev >> >> >> >> >> The end-of-window, for firing, can be approximate, but it seems it >> >> >> should be exact for timestamp assignment of the result (and >> similarly >> >> >> with the other timestamp combiners). >> >> > >> >> > I was thinking that the window itself should be stored as exact >> data, while just the firing itself is approximated, since it already is, >> because of watermarks and timers. >> >> >> >> I think this works where we can compare encoded windows, but some >> >> portable interpretation of windows is required for runner-side >> >> implementation of merging windows (for example). >> > >> > But in this case, you've recognized the URN of the WindowFn anyhow, so >> you understand its windows. Remembering that IntervalWindow is just one >> choice, and that windows themselves are totally user-defined and that >> merging logic is completely arbitrary per WindowFn (we probably should have >> some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654). >> So I file this use case in the "runner knows everything about the WindowFn >> and Window type and window encoding anyhow". >> >> Being able to merge common windows in the runner is just an >> optimization, but an important one (especially for bootstrapping >> SDKs). However, this is not just about runner to SDK, but SDK to SDK >> as well (where a user from one SDK may want to inspect the windows >> produced by another). Having MillisIntervalWindow, >> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I >> think is worth going down. >> >> Yes, we need to solve the "extract the endpoint of an unknown encoded >> window" problem as well, possibly similar to what we do with length >> prefix coders, possibly a restriction on window encodings themselves. >> >> >> There may also be issues if windows (or timestamps) are assigned to a >> >> high precision in one SDK, then inspected/acted on in another SDK, and >> >> then passed back to the original SDK where the truncation would be >> >> visible. >> > >> > This is pretty interesting and complex. But again, a window is just >> data. An SDK has to know how to deserialize it to operate on it. Unless we >> do actually standardize some aspects of it. I don't believe BoundedWindow >> encoding has a defined way to get the timestamp without decoding the >> window, does it? I thought we had basically default to all InternalWindows. >> But I am not following that closely. >> > >> >> > You raise a good point that min/max timestamp combiners require >> actually understanding the higher-precision timestamp. I can think of a >> couple things to do. One is the old "standardize all 3 or for precisions we >> need" and the other is that combiners other than EOW exist primarily to >> hold the watermark, and that hold does not require the original precision. >> Still, neither of these is that satisfying. >> >> >> >> In the current model, the output timestamp is user-visible. >> > >> > But as long as the watermark hold is less, it is safe. It requires >> knowing the coarse-precision lower bound of the timestamps of the input. >> And there may be situations where you also want the coarse upper bound. But >> you do know that these are at most one millisecond apart (assuming the >> runner is in millis) so perhaps no storage overhead. But a lot of >> complexity and chances for off by ones. And this is pretty hand-wavy. >> >> Yeah. A different SDK may (implicitly or explicitly) ask for the >> timestamp of the (transitive) output of a GBK, for which an >> approximation (either way) is undesirable. >> >> >> >> > A correction: Java *now* uses nanoseconds [1]. It uses the same >> breakdown as proto (int64 seconds since epoch + int32 nanos within second). >> It has legacy classes that use milliseconds, and Joda itself now encourages >> moving back to Java's new Instant type. Nanoseconds should complicate the >> arithmetic only for the one person authoring the date library, which they >> have already done. >> >> >> >> >> >> The encoding and decoding need to be done in a language-consistent >> way >> >> >> as well. >> >> > >> >> > I honestly am not sure what you mean by "language-consistent" here. >> >> >> >> If we want to make reading and writing of timestamps, windows >> >> cross-language, we can't rely on language-specific libraries to do the >> >> encoding. >> >> >> >> >> Also, most date libraries don't division, etc. operators, so >> >> >> we have to do that as well. Not that it should be *that* hard. >> >> > >> >> > If the libraries dedicated to time handling haven't found it >> needful, is there a specific reason you raise this? We do some simple math >> to find the window things fall into; is that it? >> >> >> >> Yes. E.g. >> >> >> >> >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77 >> >> >> >> would be a lot messier if there were no mapping date libraries to raw >> >> ints that we can do arithmetic on. Writing this with the (seconds, >> >> nanos) representation is painful. But I suppose we'd only have to do >> >> it once per SDK. >> > >> > >> > Yea I think that arithmetic is not so bad. But this raises the issue of >> writing a *generic* WindowFn where its idea of timestamp granularity (the >> WindowFn owns the window type and encoding) may not match the user data >> coming in. So you need to apply the approximation function to provide >> type-correct input to the WindowFn. That's kind of exciting and weird and >> perhaps unsolvable, except by choosing a concrete granularity. >> > >> > Kenn >> > >> >> >> >> >> >> >> >> It would also be really nice to clean up the infinite-future >> being the >> >> >> >> somewhat arbitrary max micros rounded to millis, and >> >> >> >> end-of-global-window being infinite-future minus 1 hour (IIRC), >> etc. >> >> >> >> as well as the ugly logic in Python to cope with millis-micros >> >> >> >> conversion. >> >> >> > >> >> >> > I actually don't have a problem with this. If you are trying to >> keep the representation compact, not add bytes on top of instants, then you >> just have to choose magic numbers, right? >> >> >> >> >> >> It's not about compactness, it's the (historically-derived?) >> >> >> arbitrariness of the numbers. >> >> > >> >> > What I mean is that the only reason to fit them into an integer at >> all is compactness. Otherwise, you could use a proper disjoint union >> representing your intent directly, and all fiddling goes away, like >> `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`. >> It costs a couple of bits. >> >> >> >> The other cost is not being able to use standard libraries to >> >> represent all of your timestamps. >> >> >> >> >> For example, the bounds are chosen to >> >> >> fit within 64-bit mircos despite milliseconds being the "chosen" >> >> >> granularity, and care was taken that >> >> >> >> >> >> WindowInto(Global) | GBK | WindowInto(Minute) | GBK >> >> >> >> >> >> works, but >> >> >> >> >> >> WindowInto(Global) | GBK | WindowInto(Day) | GBK >> >> >> >> >> >> may produce elements with timestamps greater than MaxTimestamp. >> >> >> >> >> >> > >> >> >> > Kenn >> >> >> > >> >> >> > [1] >> https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html >> >> >> > >> >> >> >> >> >> >> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke < >> [email protected]> wrote: >> >> >> >> >> >> >> >> >> >> +1 for plan B. Nano second precision on windowing seems... a >> little much for a system that's aggregating data over time. Even for >> processing say particle super collider data, they'd get away with >> artificially increasing the granularity in batch settings. >> >> >> >> >> >> >> >> >> >> Now if they were streaming... they'd probably want >> femtoseconds anyway. >> >> >> >> >> The point is, we should see if users demand it before adding >> in the necessary work. >> >> >> >> >> >> >> >> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath < >> [email protected]> wrote: >> >> >> >> >>> >> >> >> >> >>> +1 for plan B as well. I think it's important to make >> timestamp precision consistent now without introducing surprising behaviors >> for existing users. But we should move towards a higher granularity >> timestamp precision in the long run to support use-cases that Beam users >> otherwise might miss out (on a runner that supports such precision). >> >> >> >> >>> >> >> >> >> >>> - Cham >> >> >> >> >>> >> >> >> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik < >> [email protected]> wrote: >> >> >> >> >>>> >> >> >> >> >>>> I also like Plan B because in the cross language case, the >> pipeline would not work since every party (Runners & SDKs) would have to be >> aware of the new beam:coder:windowed_value:v2 coder. Plan A has the >> property where if the SDK/Runner wasn't updated then it may start >> truncating the timestamps unexpectedly. >> >> >> >> >>>> >> >> >> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik < >> [email protected]> wrote: >> >> >> >> >>>>> >> >> >> >> >>>>> Kenn, this discussion is about the precision of the >> timestamp in the user data. As you had mentioned, Runners need not have the >> same granularity of user data as long as they correctly round the timestamp >> to guarantee that triggers are executed correctly but the user data should >> have the same precision across SDKs otherwise user data timestamps will be >> truncated in cross language scenarios. >> >> >> >> >>>>> >> >> >> >> >>>>> Based on the systems that were listed, either microsecond >> or nanosecond would make sense. The issue with changing the precision is >> that all Beam runners except for possibly Beam Python on Dataflow are using >> millisecond precision since they are all using the same Java Runner >> windowing/trigger logic. >> >> >> >> >>>>> >> >> >> >> >>>>> Plan A: Swap precision to nanosecond >> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond >> precision timestamps (do now) >> >> >> >> >>>>> 2) Change the user data encoding to support nanosecond >> precision (do now) >> >> >> >> >>>>> 3) Swap runner libraries to be nanosecond precision aware >> updating all window/triggering logic (do later) >> >> >> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later) >> >> >> >> >>>>> >> >> >> >> >>>>> Plan B: >> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond >> precision timestamps and keep the data encoding as is (do now) >> >> >> >> >>>>> (We could add greater precision later to plan B by >> creating a new version beam:coder:windowed_value:v2 which would be >> nanosecond and would require runners to correctly perform an internal >> conversions for windowing/triggering.) >> >> >> >> >>>>> >> >> >> >> >>>>> I think we should go with Plan B and when users request >> greater precision we can make that an explicit effort. What do people think? >> >> >> >> >>>>> >> >> >> >> >>>>> >> >> >> >> >>>>> >> >> >> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels < >> [email protected]> wrote: >> >> >> >> >>>>>> >> >> >> >> >>>>>> Hi, >> >> >> >> >>>>>> >> >> >> >> >>>>>> Thanks for taking care of this issue in the Python SDK, >> Thomas! >> >> >> >> >>>>>> >> >> >> >> >>>>>> It would be nice to have a uniform precision for >> timestamps but, as Kenn >> >> >> >> >>>>>> pointed out, timestamps are extracted from systems that >> have different >> >> >> >> >>>>>> precision. >> >> >> >> >>>>>> >> >> >> >> >>>>>> To add to the list: Flink - milliseconds >> >> >> >> >>>>>> >> >> >> >> >>>>>> After all, it doesn't matter as long as there is >> sufficient precision >> >> >> >> >>>>>> and conversions are done correctly. >> >> >> >> >>>>>> >> >> >> >> >>>>>> I think we could improve the situation by at least adding >> a >> >> >> >> >>>>>> "milliseconds" constructor to the Python SDK's Timestamp. >> >> >> >> >>>>>> >> >> >> >> >>>>>> Cheers, >> >> >> >> >>>>>> Max >> >> >> >> >>>>>> >> >> >> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote: >> >> >> >> >>>>>> > I am not so sure this is a good idea. Here are some >> systems and their >> >> >> >> >>>>>> > precision: >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > Arrow - microseconds >> >> >> >> >>>>>> > BigQuery - microseconds >> >> >> >> >>>>>> > New Java instant - nanoseconds >> >> >> >> >>>>>> > Firestore - microseconds >> >> >> >> >>>>>> > Protobuf - nanoseconds >> >> >> >> >>>>>> > Dataflow backend - microseconds >> >> >> >> >>>>>> > Postgresql - microseconds >> >> >> >> >>>>>> > Pubsub publish time - nanoseconds >> >> >> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime >> about 3 millis) >> >> >> >> >>>>>> > Cassandra - milliseconds >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > IMO it is important to be able to treat any of these as >> a Beam >> >> >> >> >>>>>> > timestamp, even though they aren't all streaming. Who >> knows when we >> >> >> >> >>>>>> > might be ingesting a streamed changelog, or using them >> for reprocessing >> >> >> >> >>>>>> > an archived stream. I think for this purpose we either >> should >> >> >> >> >>>>>> > standardize on nanoseconds or make the runner's >> resolution independent >> >> >> >> >>>>>> > of the data representation. >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > I've had some offline conversations about this. I think >> we can have >> >> >> >> >>>>>> > higher-than-runner precision in the user data, and >> allow WindowFns and >> >> >> >> >>>>>> > DoFns to operate on this higher-than-runner precision >> data, and still >> >> >> >> >>>>>> > have consistent watermark treatment. Watermarks are >> just bounds, after all. >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > Kenn >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise < >> [email protected] >> >> >> >> >>>>>> > <mailto:[email protected]>> wrote: >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > The Python SDK currently uses timestamps in >> microsecond resolution >> >> >> >> >>>>>> > while Java SDK, as most would probably expect, uses >> milliseconds. >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > This causes a few difficulties with portability >> (Python coders need >> >> >> >> >>>>>> > to convert to millis for WindowedValue and Timers, >> which is related >> >> >> >> >>>>>> > to a bug I'm looking into: >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-7035 >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > As Luke pointed out, the issue was previously >> discussed: >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > https://issues.apache.org/jira/browse/BEAM-1524 >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > I'm not privy to the reasons why we decided to go >> with micros in >> >> >> >> >>>>>> > first place, but would it be too big of a change or >> impractical for >> >> >> >> >>>>>> > other reasons to switch Python SDK to millis before >> it gets more users? >> >> >> >> >>>>>> > >> >> >> >> >>>>>> > Thanks, >> >> >> >> >>>>>> > Thomas >> >> >> >> >>>>>> > >> >
