TL;DR: We should just settle on nanosecond precision ubiquitously for
timestamp/windowing in Beam.


Re-visiting this discussion in light of cross-language transforms and
runners, and trying to tighten up testing. I've spent some more time
thinking about how we could make these operations granularity-agnostic, but
just can't find a good solution. In particular, the sticklers seem to be:

(1) Windows are half-open intervals, and the timestamp associated with a
window coming out of a GBK is (by default) as large as possible but must
live in that window. (Otherwise WindowInto + GBK + WindowInto would have
the unforunate effect of moving aggregate values into subsequent windows,
which is clearly not the intent.) In other words, the timestamp of a
grouped value is basically End(Window) - epsilon. Unless we choose a
representation able to encode "minus epsilon" we must agree on a
granularity.

(2) Unless we want to have multiple vairants of all our WindowFns (e.g.
FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a
granularity with which to parameterize these well-known operations. There
are cases (e.g. side input window mapping, merging) where these Fns may be
used downstream in contexts other than where they are applied/defined.

(3) Reification of the timestamp into user-visible data, and the other way
around, require a choice of precision to expose to the user. This means
that the timestamp is actual data, and truncating/rounding cannot be done
implicitly. Also round trip of reification and application of timestamps
should hopefully be idempotent no matter the SDK.

The closest I've come is possibly parameterizing the timestamp type, where
encoding, decoding (including pulling the end out of a window?), comparison
(against each other and a watermark), "minus epsilon", etc could be UDFs.
Possibly we'd need the full set of arithmetic operations to implement
FixedWindows on an unknown timestamp type. Reification would simply be
dis-allowed (or return an opaque rather than SDK-native) type if the SDK
did not know that window type. The fact that one might need comparison
between timestamps of different types, or (lossless) coercion from one type
to another, means that timestamp types need to know about each other, or
another entity needs to know about the full cross-product, unless there is
a common base-type (at which point we might as well always choose that).

An intermediate solution is to settle on floating (decimal) point
representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the
mapping through SDK-native types (which could require rounding or errors or
a new opaque type, and few date librarys could faithfully expose the minus
epsilon part). It might also be more expensive (compute and storage), and
would not allow us to use the protofuf timestamp/duration fields (or any
standard date/time libraries).

Unless we can come up with a clean solution to the issues above shortly, I
think we should fix a precision and move forward. If this makes sense to
everyone, then we can start talking about the specific choice of precision
and a migration path (possibly only for portability).


For reference, the manipulations we do on timestamps are:

WindowInto: Timestamp -> Window
TimestampCombine: Window, [Timestamp] -> Timestamp
    End(Window)
    Min(Timestamps)
    Max(Timestamps)
PastEndOfWindow: Watermark, Window -> {True, False}

[SideInput]WindowMappingFn: Window -> Window
    WindowInto(End(Window))

GetTimestamp: Timestamp -> SDK Native Object
EmitAtTimestamp: SDK Native Object -> Timestamp






On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <[email protected]> wrote:

> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <[email protected]> wrote:
>
> > From: Robert Bradshaw <[email protected]>
> > Date: Wed, May 8, 2019 at 3:00 PM
> > To: dev
> >
> >> From: Kenneth Knowles <[email protected]>
> >> Date: Wed, May 8, 2019 at 6:50 PM
> >> To: dev
> >>
> >> >> The end-of-window, for firing, can be approximate, but it seems it
> >> >> should be exact for timestamp assignment of the result (and similarly
> >> >> with the other timestamp combiners).
> >> >
> >> > I was thinking that the window itself should be stored as exact data,
> while just the firing itself is approximated, since it already is, because
> of watermarks and timers.
> >>
> >> I think this works where we can compare encoded windows, but some
> >> portable interpretation of windows is required for runner-side
> >> implementation of merging windows (for example).
> >
> > But in this case, you've recognized the URN of the WindowFn anyhow, so
> you understand its windows. Remembering that IntervalWindow is just one
> choice, and that windows themselves are totally user-defined and that
> merging logic is completely arbitrary per WindowFn (we probably should have
> some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654).
> So I file this use case in the "runner knows everything about the WindowFn
> and Window type and window encoding anyhow".
>
> Being able to merge common windows in the runner is just an
> optimization, but an important one (especially for bootstrapping
> SDKs). However, this is not just about runner to SDK, but SDK to SDK
> as well (where a user from one SDK may want to inspect the windows
> produced by another). Having MillisIntervalWindow,
> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
> think is worth going down.
>
> Yes, we need to solve the "extract the endpoint of an unknown encoded
> window" problem as well, possibly similar to what we do with length
> prefix coders, possibly a restriction on window encodings themselves.
>
> >> There may also be issues if windows (or timestamps) are assigned to a
> >> high precision in one SDK, then inspected/acted on in another SDK, and
> >> then passed back to the original SDK where the truncation would be
> >> visible.
> >
> > This is pretty interesting and complex. But again, a window is just
> data. An SDK has to know how to deserialize it to operate on it. Unless we
> do actually standardize some aspects of it. I don't believe BoundedWindow
> encoding has a defined way to get the timestamp without decoding the
> window, does it? I thought we had basically default to all InternalWindows.
> But I am not following that closely.
> >
> >> > You raise a good point that min/max timestamp combiners require
> actually understanding the higher-precision timestamp. I can think of a
> couple things to do. One is the old "standardize all 3 or for precisions we
> need" and the other is that combiners other than EOW exist primarily to
> hold the watermark, and that hold does not require the original precision.
> Still, neither of these is that satisfying.
> >>
> >> In the current model, the output timestamp is user-visible.
> >
> > But as long as the watermark hold is less, it is safe. It requires
> knowing the coarse-precision lower bound of the timestamps of the input.
> And there may be situations where you also want the coarse upper bound. But
> you do know that these are at most one millisecond apart (assuming the
> runner is in millis) so perhaps no storage overhead. But a lot of
> complexity and chances for off by ones. And this is pretty hand-wavy.
>
> Yeah. A different SDK may (implicitly or explicitly) ask for the
> timestamp of the (transitive) output of a GBK, for which an
> approximation (either way) is undesirable.
>
> >> >> > A correction: Java *now* uses nanoseconds [1]. It uses the same
> breakdown as proto (int64 seconds since epoch + int32 nanos within second).
> It has legacy classes that use milliseconds, and Joda itself now encourages
> moving back to Java's new Instant type. Nanoseconds should complicate the
> arithmetic only for the one person authoring the date library, which they
> have already done.
> >> >>
> >> >> The encoding and decoding need to be done in a language-consistent
> way
> >> >> as well.
> >> >
> >> > I honestly am not sure what you mean by "language-consistent" here.
> >>
> >> If we want to make reading and writing of timestamps, windows
> >> cross-language, we can't rely on language-specific libraries to do the
> >> encoding.
> >>
> >> >> Also, most date libraries don't division, etc. operators, so
> >> >> we have to do that as well. Not that it should be *that* hard.
> >> >
> >> > If the libraries dedicated to time handling haven't found it needful,
> is there a specific reason you raise this? We do some simple math to find
> the window things fall into; is that it?
> >>
> >> Yes. E.g.
> >>
> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
> >>
> >> would be a lot messier if there were no mapping date libraries to raw
> >> ints that we can do arithmetic on. Writing this with the (seconds,
> >> nanos) representation is painful. But I suppose we'd only have to do
> >> it once per SDK.
> >
> >
> > Yea I think that arithmetic is not so bad. But this raises the issue of
> writing a *generic* WindowFn where its idea of timestamp granularity (the
> WindowFn owns the window type and encoding) may not match the user data
> coming in. So you need to apply the approximation function to provide
> type-correct input to the WindowFn. That's kind of exciting and weird and
> perhaps unsolvable, except by choosing a concrete granularity.
> >
> > Kenn
> >
> >>
> >>
> >> >> >> It would also be really nice to clean up the infinite-future
> being the
> >> >> >> somewhat arbitrary max micros rounded to millis, and
> >> >> >> end-of-global-window being infinite-future minus 1 hour (IIRC),
> etc.
> >> >> >> as well as the ugly logic in Python to cope with millis-micros
> >> >> >> conversion.
> >> >> >
> >> >> > I actually don't have a problem with this. If you are trying to
> keep the representation compact, not add bytes on top of instants, then you
> just have to choose magic numbers, right?
> >> >>
> >> >> It's not about compactness, it's the (historically-derived?)
> >> >> arbitrariness of the numbers.
> >> >
> >> > What I mean is that the only reason to fit them into an integer at
> all is compactness. Otherwise, you could use a proper disjoint union
> representing your intent directly, and all fiddling goes away, like
> `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`.
> It costs a couple of bits.
> >>
> >> The other cost is not being able to use standard libraries to
> >> represent all of your timestamps.
> >>
> >> >> For example, the bounds are chosen to
> >> >> fit within 64-bit mircos despite milliseconds being the "chosen"
> >> >> granularity, and care was taken that
> >> >>
> >> >>     WindowInto(Global) | GBK | WindowInto(Minute) | GBK
> >> >>
> >> >> works, but
> >> >>
> >> >>     WindowInto(Global) | GBK | WindowInto(Day) | GBK
> >> >>
> >> >> may produce elements with timestamps greater than MaxTimestamp.
> >> >>
> >> >> >
> >> >> > Kenn
> >> >> >
> >> >> > [1]
> https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
> >> >> >
> >> >> >>
> >> >> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <
> [email protected]> wrote:
> >> >> >> >>
> >> >> >> >> +1 for plan B. Nano second precision on windowing seems... a
> little much for a system that's aggregating data over time. Even for
> processing say particle super collider data, they'd get away with
> artificially increasing the granularity in batch settings.
> >> >> >> >>
> >> >> >> >> Now if they were streaming... they'd probably want
> femtoseconds anyway.
> >> >> >> >> The point is, we should see if users demand it before adding
> in the necessary work.
> >> >> >> >>
> >> >> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <
> [email protected]> wrote:
> >> >> >> >>>
> >> >> >> >>> +1 for plan B as well. I think it's important to make
> timestamp precision consistent now without introducing surprising behaviors
> for existing users. But we should move towards a higher granularity
> timestamp precision in the long run to support use-cases that Beam users
> otherwise might miss out (on a runner that supports such precision).
> >> >> >> >>>
> >> >> >> >>> - Cham
> >> >> >> >>>
> >> >> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <[email protected]>
> wrote:
> >> >> >> >>>>
> >> >> >> >>>> I also like Plan B because in the cross language case, the
> pipeline would not work since every party (Runners & SDKs) would have to be
> aware of the new beam:coder:windowed_value:v2 coder. Plan A has the
> property where if the SDK/Runner wasn't updated then it may start
> truncating the timestamps unexpectedly.
> >> >> >> >>>>
> >> >> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <
> [email protected]> wrote:
> >> >> >> >>>>>
> >> >> >> >>>>> Kenn, this discussion is about the precision of the
> timestamp in the user data. As you had mentioned, Runners need not have the
> same granularity of user data as long as they correctly round the timestamp
> to guarantee that triggers are executed correctly but the user data should
> have the same precision across SDKs otherwise user data timestamps will be
> truncated in cross language scenarios.
> >> >> >> >>>>>
> >> >> >> >>>>> Based on the systems that were listed, either microsecond
> or nanosecond would make sense. The issue with changing the precision is
> that all Beam runners except for possibly Beam Python on Dataflow are using
> millisecond precision since they are all using the same Java Runner
> windowing/trigger logic.
> >> >> >> >>>>>
> >> >> >> >>>>> Plan A: Swap precision to nanosecond
> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond
> precision timestamps (do now)
> >> >> >> >>>>> 2) Change the user data encoding to support nanosecond
> precision (do now)
> >> >> >> >>>>> 3) Swap runner libraries to be nanosecond precision aware
> updating all window/triggering logic (do later)
> >> >> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later)
> >> >> >> >>>>>
> >> >> >> >>>>> Plan B:
> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond
> precision timestamps and keep the data encoding as is (do now)
> >> >> >> >>>>> (We could add greater precision later to plan B by creating
> a new version beam:coder:windowed_value:v2 which would be nanosecond and
> would require runners to correctly perform an internal conversions for
> windowing/triggering.)
> >> >> >> >>>>>
> >> >> >> >>>>> I think we should go with Plan B and when users request
> greater precision we can make that an explicit effort. What do people think?
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>>
> >> >> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <
> [email protected]> wrote:
> >> >> >> >>>>>>
> >> >> >> >>>>>> Hi,
> >> >> >> >>>>>>
> >> >> >> >>>>>> Thanks for taking care of this issue in the Python SDK,
> Thomas!
> >> >> >> >>>>>>
> >> >> >> >>>>>> It would be nice to have a uniform precision for
> timestamps but, as Kenn
> >> >> >> >>>>>> pointed out, timestamps are extracted from systems that
> have different
> >> >> >> >>>>>> precision.
> >> >> >> >>>>>>
> >> >> >> >>>>>> To add to the list: Flink - milliseconds
> >> >> >> >>>>>>
> >> >> >> >>>>>> After all, it doesn't matter as long as there is
> sufficient precision
> >> >> >> >>>>>> and conversions are done correctly.
> >> >> >> >>>>>>
> >> >> >> >>>>>> I think we could improve the situation by at least adding a
> >> >> >> >>>>>> "milliseconds" constructor to the Python SDK's Timestamp.
> >> >> >> >>>>>>
> >> >> >> >>>>>> Cheers,
> >> >> >> >>>>>> Max
> >> >> >> >>>>>>
> >> >> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote:
> >> >> >> >>>>>> > I am not so sure this is a good idea. Here are some
> systems and their
> >> >> >> >>>>>> > precision:
> >> >> >> >>>>>> >
> >> >> >> >>>>>> > Arrow - microseconds
> >> >> >> >>>>>> > BigQuery - microseconds
> >> >> >> >>>>>> > New Java instant - nanoseconds
> >> >> >> >>>>>> > Firestore - microseconds
> >> >> >> >>>>>> > Protobuf - nanoseconds
> >> >> >> >>>>>> > Dataflow backend - microseconds
> >> >> >> >>>>>> > Postgresql - microseconds
> >> >> >> >>>>>> > Pubsub publish time - nanoseconds
> >> >> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime
> about 3 millis)
> >> >> >> >>>>>> > Cassandra - milliseconds
> >> >> >> >>>>>> >
> >> >> >> >>>>>> > IMO it is important to be able to treat any of these as
> a Beam
> >> >> >> >>>>>> > timestamp, even though they aren't all streaming. Who
> knows when we
> >> >> >> >>>>>> > might be ingesting a streamed changelog, or using them
> for reprocessing
> >> >> >> >>>>>> > an archived stream. I think for this purpose we either
> should
> >> >> >> >>>>>> > standardize on nanoseconds or make the runner's
> resolution independent
> >> >> >> >>>>>> > of the data representation.
> >> >> >> >>>>>> >
> >> >> >> >>>>>> > I've had some offline conversations about this. I think
> we can have
> >> >> >> >>>>>> > higher-than-runner precision in the user data, and allow
> WindowFns and
> >> >> >> >>>>>> > DoFns to operate on this higher-than-runner precision
> data, and still
> >> >> >> >>>>>> > have consistent watermark treatment. Watermarks are just
> bounds, after all.
> >> >> >> >>>>>> >
> >> >> >> >>>>>> > Kenn
> >> >> >> >>>>>> >
> >> >> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <
> [email protected]
> >> >> >> >>>>>> > <mailto:[email protected]>> wrote:
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     The Python SDK currently uses timestamps in
> microsecond resolution
> >> >> >> >>>>>> >     while Java SDK, as most would probably expect, uses
> milliseconds.
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     This causes a few difficulties with portability
> (Python coders need
> >> >> >> >>>>>> >     to convert to millis for WindowedValue and Timers,
> which is related
> >> >> >> >>>>>> >     to a bug I'm looking into:
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-7035
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     As Luke pointed out, the issue was previously
> discussed:
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-1524
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     I'm not privy to the reasons why we decided to go
> with micros in
> >> >> >> >>>>>> >     first place, but would it be too big of a change or
> impractical for
> >> >> >> >>>>>> >     other reasons to switch Python SDK to millis before
> it gets more users?
> >> >> >> >>>>>> >
> >> >> >> >>>>>> >     Thanks,
> >> >> >> >>>>>> >     Thomas
> >> >> >> >>>>>> >
>

Reply via email to