I would also suggest using Java's Instant since it will be compatible with
many more date/time libraries without forcing onto users the need to go
through an artificial millis/nanos conversion layer to Java's Instant.

On Tue, Oct 29, 2019 at 5:06 PM Robert Bradshaw <[email protected]> wrote:

> On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles <[email protected]> wrote:
> >
> > Point (1) is compelling. Solutions to the "minus epsilon" seem a bit
> complex. On the other hand, an opaque and abstract Timestamp type (in each
> SDK) going forward seems like a Pretty Good Idea (tm). Would you really
> have to go floating point? Could you just have a distinguished
> representation for non-inclusive upper/lower bounds? These could be at the
> same reduced resolution as timestamps in element metadata, since that is
> all they are compared against.
>
> If I were coming up with an abstract, opaque representation of
> Timestamp (and Duration) for Beam, I would explicitly include the
> "minus epsilon" concept. One could still do arithmetic with these.
> This would make any conversion to standard datetime libraries lossy
> though.
>
> > Point (2) is also good, though it seems like something that could be
> cleverly engineered and/or we just provide one implementation and it is
> easy to make your own for finer granularity, since a WindowFn separately
> receives the Timestamp (here I'm pretending it is abstract and opaque and
> likely approximate) and the original element with whatever precision the
> original data included.
>
> Yes, but I don't see how a generic WindowFn would reach into the
> (arbitrary) element and pull out this original data. One of the
> benefits of the Beam model is that the WindowFn does not have to
> depend on the element type.
>
> > Point (3) the model/runner owns the timestamp metadata so I feel fine
> about it being approximated as long as any original user data is still
> present. I don't recall seeing a compelling case where the timestamp
> metadata that the runner tracks and understands is required to be exactly
> the same as a user value (assuming users understand this distinction, which
> is another issue that I would separate from whether it is technically
> feasible).
>
> As we provide the ability to designate user data as the runner
> timestamp against which to window, and promote the runner timestamp
> back to user data (people are going to want to get DateTime or Instant
> objects out of it), it seems tricky to explain to users that one or
> both of these operations may be lossy (and, in addition, I don't think
> there's a consistently safe direction to round).
>
> > The more I think about the very real problems you point out, the more I
> think that our backwards-incompatible move should be to our own abstract
> Timestamp type, putting the design decision behind a minimal interface. If
> we see a concrete design for that data type, we might be inspired how to
> support more possibilities.
> >
> > As for the rest of the speculation... moving to nanos immediately helps
> users so I am now +1 on just doing it, or moving ahead with an abstract
> data type under the assumption that it will basically be nanos under the
> hood.
>
> If the fact that it's stored as nanos under the hood leaks out (and I
> have trouble seeing how it won't) I'd lean towards just using them
> directly (e.g. Java Instant) rather than wrapping it.
>
> > Having a cleverly resolution-independent system is interesting and maybe
> extremely future proof but maybe preparing for a very distant future that
> may never come.
> >
> > Kenn
> >
> > On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> TL;DR: We should just settle on nanosecond precision ubiquitously for
> timestamp/windowing in Beam.
> >>
> >>
> >> Re-visiting this discussion in light of cross-language transforms and
> runners, and trying to tighten up testing. I've spent some more time
> thinking about how we could make these operations granularity-agnostic, but
> just can't find a good solution. In particular, the sticklers seem to be:
> >>
> >> (1) Windows are half-open intervals, and the timestamp associated with
> a window coming out of a GBK is (by default) as large as possible but must
> live in that window. (Otherwise WindowInto + GBK + WindowInto would have
> the unforunate effect of moving aggregate values into subsequent windows,
> which is clearly not the intent.) In other words, the timestamp of a
> grouped value is basically End(Window) - epsilon. Unless we choose a
> representation able to encode "minus epsilon" we must agree on a
> granularity.
> >>
> >> (2) Unless we want to have multiple vairants of all our WindowFns (e.g.
> FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a
> granularity with which to parameterize these well-known operations. There
> are cases (e.g. side input window mapping, merging) where these Fns may be
> used downstream in contexts other than where they are applied/defined.
> >>
> >> (3) Reification of the timestamp into user-visible data, and the other
> way around, require a choice of precision to expose to the user. This means
> that the timestamp is actual data, and truncating/rounding cannot be done
> implicitly. Also round trip of reification and application of timestamps
> should hopefully be idempotent no matter the SDK.
> >>
> >> The closest I've come is possibly parameterizing the timestamp type,
> where encoding, decoding (including pulling the end out of a window?),
> comparison (against each other and a watermark), "minus epsilon", etc could
> be UDFs. Possibly we'd need the full set of arithmetic operations to
> implement FixedWindows on an unknown timestamp type. Reification would
> simply be dis-allowed (or return an opaque rather than SDK-native) type if
> the SDK did not know that window type. The fact that one might need
> comparison between timestamps of different types, or (lossless) coercion
> from one type to another, means that timestamp types need to know about
> each other, or another entity needs to know about the full cross-product,
> unless there is a common base-type (at which point we might as well always
> choose that).
> >>
> >> An intermediate solution is to settle on floating (decimal) point
> representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the
> mapping through SDK-native types (which could require rounding or errors or
> a new opaque type, and few date librarys could faithfully expose the minus
> epsilon part). It might also be more expensive (compute and storage), and
> would not allow us to use the protofuf timestamp/duration fields (or any
> standard date/time libraries).
> >>
> >> Unless we can come up with a clean solution to the issues above
> shortly, I think we should fix a precision and move forward. If this makes
> sense to everyone, then we can start talking about the specific choice of
> precision and a migration path (possibly only for portability).
> >>
> >>
> >> For reference, the manipulations we do on timestamps are:
> >>
> >> WindowInto: Timestamp -> Window
> >> TimestampCombine: Window, [Timestamp] -> Timestamp
> >>     End(Window)
> >>     Min(Timestamps)
> >>     Max(Timestamps)
> >> PastEndOfWindow: Watermark, Window -> {True, False}
> >>
> >> [SideInput]WindowMappingFn: Window -> Window
> >>     WindowInto(End(Window))
> >>
> >> GetTimestamp: Timestamp -> SDK Native Object
> >> EmitAtTimestamp: SDK Native Object -> Timestamp
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <[email protected]>
> wrote:
> >>>
> >>> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <[email protected]>
> wrote:
> >>>
> >>> > From: Robert Bradshaw <[email protected]>
> >>> > Date: Wed, May 8, 2019 at 3:00 PM
> >>> > To: dev
> >>> >
> >>> >> From: Kenneth Knowles <[email protected]>
> >>> >> Date: Wed, May 8, 2019 at 6:50 PM
> >>> >> To: dev
> >>> >>
> >>> >> >> The end-of-window, for firing, can be approximate, but it seems
> it
> >>> >> >> should be exact for timestamp assignment of the result (and
> similarly
> >>> >> >> with the other timestamp combiners).
> >>> >> >
> >>> >> > I was thinking that the window itself should be stored as exact
> data, while just the firing itself is approximated, since it already is,
> because of watermarks and timers.
> >>> >>
> >>> >> I think this works where we can compare encoded windows, but some
> >>> >> portable interpretation of windows is required for runner-side
> >>> >> implementation of merging windows (for example).
> >>> >
> >>> > But in this case, you've recognized the URN of the WindowFn anyhow,
> so you understand its windows. Remembering that IntervalWindow is just one
> choice, and that windows themselves are totally user-defined and that
> merging logic is completely arbitrary per WindowFn (we probably should have
> some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654).
> So I file this use case in the "runner knows everything about the WindowFn
> and Window type and window encoding anyhow".
> >>>
> >>> Being able to merge common windows in the runner is just an
> >>> optimization, but an important one (especially for bootstrapping
> >>> SDKs). However, this is not just about runner to SDK, but SDK to SDK
> >>> as well (where a user from one SDK may want to inspect the windows
> >>> produced by another). Having MillisIntervalWindow,
> >>> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
> >>> think is worth going down.
> >>>
> >>> Yes, we need to solve the "extract the endpoint of an unknown encoded
> >>> window" problem as well, possibly similar to what we do with length
> >>> prefix coders, possibly a restriction on window encodings themselves.
> >>>
> >>> >> There may also be issues if windows (or timestamps) are assigned to
> a
> >>> >> high precision in one SDK, then inspected/acted on in another SDK,
> and
> >>> >> then passed back to the original SDK where the truncation would be
> >>> >> visible.
> >>> >
> >>> > This is pretty interesting and complex. But again, a window is just
> data. An SDK has to know how to deserialize it to operate on it. Unless we
> do actually standardize some aspects of it. I don't believe BoundedWindow
> encoding has a defined way to get the timestamp without decoding the
> window, does it? I thought we had basically default to all InternalWindows.
> But I am not following that closely.
> >>> >
> >>> >> > You raise a good point that min/max timestamp combiners require
> actually understanding the higher-precision timestamp. I can think of a
> couple things to do. One is the old "standardize all 3 or for precisions we
> need" and the other is that combiners other than EOW exist primarily to
> hold the watermark, and that hold does not require the original precision.
> Still, neither of these is that satisfying.
> >>> >>
> >>> >> In the current model, the output timestamp is user-visible.
> >>> >
> >>> > But as long as the watermark hold is less, it is safe. It requires
> knowing the coarse-precision lower bound of the timestamps of the input.
> And there may be situations where you also want the coarse upper bound. But
> you do know that these are at most one millisecond apart (assuming the
> runner is in millis) so perhaps no storage overhead. But a lot of
> complexity and chances for off by ones. And this is pretty hand-wavy.
> >>>
> >>> Yeah. A different SDK may (implicitly or explicitly) ask for the
> >>> timestamp of the (transitive) output of a GBK, for which an
> >>> approximation (either way) is undesirable.
> >>>
> >>> >> >> > A correction: Java *now* uses nanoseconds [1]. It uses the
> same breakdown as proto (int64 seconds since epoch + int32 nanos within
> second). It has legacy classes that use milliseconds, and Joda itself now
> encourages moving back to Java's new Instant type. Nanoseconds should
> complicate the arithmetic only for the one person authoring the date
> library, which they have already done.
> >>> >> >>
> >>> >> >> The encoding and decoding need to be done in a
> language-consistent way
> >>> >> >> as well.
> >>> >> >
> >>> >> > I honestly am not sure what you mean by "language-consistent"
> here.
> >>> >>
> >>> >> If we want to make reading and writing of timestamps, windows
> >>> >> cross-language, we can't rely on language-specific libraries to do
> the
> >>> >> encoding.
> >>> >>
> >>> >> >> Also, most date libraries don't division, etc. operators, so
> >>> >> >> we have to do that as well. Not that it should be *that* hard.
> >>> >> >
> >>> >> > If the libraries dedicated to time handling haven't found it
> needful, is there a specific reason you raise this? We do some simple math
> to find the window things fall into; is that it?
> >>> >>
> >>> >> Yes. E.g.
> >>> >>
> >>> >>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
> >>> >>
> >>> >> would be a lot messier if there were no mapping date libraries to
> raw
> >>> >> ints that we can do arithmetic on. Writing this with the (seconds,
> >>> >> nanos) representation is painful. But I suppose we'd only have to do
> >>> >> it once per SDK.
> >>> >
> >>> >
> >>> > Yea I think that arithmetic is not so bad. But this raises the issue
> of writing a *generic* WindowFn where its idea of timestamp granularity
> (the WindowFn owns the window type and encoding) may not match the user
> data coming in. So you need to apply the approximation function to provide
> type-correct input to the WindowFn. That's kind of exciting and weird and
> perhaps unsolvable, except by choosing a concrete granularity.
> >>> >
> >>> > Kenn
> >>> >
> >>> >>
> >>> >>
> >>> >> >> >> It would also be really nice to clean up the infinite-future
> being the
> >>> >> >> >> somewhat arbitrary max micros rounded to millis, and
> >>> >> >> >> end-of-global-window being infinite-future minus 1 hour
> (IIRC), etc.
> >>> >> >> >> as well as the ugly logic in Python to cope with millis-micros
> >>> >> >> >> conversion.
> >>> >> >> >
> >>> >> >> > I actually don't have a problem with this. If you are trying
> to keep the representation compact, not add bytes on top of instants, then
> you just have to choose magic numbers, right?
> >>> >> >>
> >>> >> >> It's not about compactness, it's the (historically-derived?)
> >>> >> >> arbitrariness of the numbers.
> >>> >> >
> >>> >> > What I mean is that the only reason to fit them into an integer
> at all is compactness. Otherwise, you could use a proper disjoint union
> representing your intent directly, and all fiddling goes away, like
> `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`.
> It costs a couple of bits.
> >>> >>
> >>> >> The other cost is not being able to use standard libraries to
> >>> >> represent all of your timestamps.
> >>> >>
> >>> >> >> For example, the bounds are chosen to
> >>> >> >> fit within 64-bit mircos despite milliseconds being the "chosen"
> >>> >> >> granularity, and care was taken that
> >>> >> >>
> >>> >> >>     WindowInto(Global) | GBK | WindowInto(Minute) | GBK
> >>> >> >>
> >>> >> >> works, but
> >>> >> >>
> >>> >> >>     WindowInto(Global) | GBK | WindowInto(Day) | GBK
> >>> >> >>
> >>> >> >> may produce elements with timestamps greater than MaxTimestamp.
> >>> >> >>
> >>> >> >> >
> >>> >> >> > Kenn
> >>> >> >> >
> >>> >> >> > [1]
> https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
> >>> >> >> >
> >>> >> >> >>
> >>> >> >> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <
> [email protected]> wrote:
> >>> >> >> >> >>
> >>> >> >> >> >> +1 for plan B. Nano second precision on windowing seems...
> a little much for a system that's aggregating data over time. Even for
> processing say particle super collider data, they'd get away with
> artificially increasing the granularity in batch settings.
> >>> >> >> >> >>
> >>> >> >> >> >> Now if they were streaming... they'd probably want
> femtoseconds anyway.
> >>> >> >> >> >> The point is, we should see if users demand it before
> adding in the necessary work.
> >>> >> >> >> >>
> >>> >> >> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <
> [email protected]> wrote:
> >>> >> >> >> >>>
> >>> >> >> >> >>> +1 for plan B as well. I think it's important to make
> timestamp precision consistent now without introducing surprising behaviors
> for existing users. But we should move towards a higher granularity
> timestamp precision in the long run to support use-cases that Beam users
> otherwise might miss out (on a runner that supports such precision).
> >>> >> >> >> >>>
> >>> >> >> >> >>> - Cham
> >>> >> >> >> >>>
> >>> >> >> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <
> [email protected]> wrote:
> >>> >> >> >> >>>>
> >>> >> >> >> >>>> I also like Plan B because in the cross language case,
> the pipeline would not work since every party (Runners & SDKs) would have
> to be aware of the new beam:coder:windowed_value:v2 coder. Plan A has the
> property where if the SDK/Runner wasn't updated then it may start
> truncating the timestamps unexpectedly.
> >>> >> >> >> >>>>
> >>> >> >> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <
> [email protected]> wrote:
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>> Kenn, this discussion is about the precision of the
> timestamp in the user data. As you had mentioned, Runners need not have the
> same granularity of user data as long as they correctly round the timestamp
> to guarantee that triggers are executed correctly but the user data should
> have the same precision across SDKs otherwise user data timestamps will be
> truncated in cross language scenarios.
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>> Based on the systems that were listed, either
> microsecond or nanosecond would make sense. The issue with changing the
> precision is that all Beam runners except for possibly Beam Python on
> Dataflow are using millisecond precision since they are all using the same
> Java Runner windowing/trigger logic.
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>> Plan A: Swap precision to nanosecond
> >>> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond
> precision timestamps (do now)
> >>> >> >> >> >>>>> 2) Change the user data encoding to support nanosecond
> precision (do now)
> >>> >> >> >> >>>>> 3) Swap runner libraries to be nanosecond precision
> aware updating all window/triggering logic (do later)
> >>> >> >> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later)
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>> Plan B:
> >>> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond
> precision timestamps and keep the data encoding as is (do now)
> >>> >> >> >> >>>>> (We could add greater precision later to plan B by
> creating a new version beam:coder:windowed_value:v2 which would be
> nanosecond and would require runners to correctly perform an internal
> conversions for windowing/triggering.)
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>> I think we should go with Plan B and when users request
> greater precision we can make that an explicit effort. What do people think?
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>>
> >>> >> >> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <
> [email protected]> wrote:
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> Hi,
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> Thanks for taking care of this issue in the Python
> SDK, Thomas!
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> It would be nice to have a uniform precision for
> timestamps but, as Kenn
> >>> >> >> >> >>>>>> pointed out, timestamps are extracted from systems
> that have different
> >>> >> >> >> >>>>>> precision.
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> To add to the list: Flink - milliseconds
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> After all, it doesn't matter as long as there is
> sufficient precision
> >>> >> >> >> >>>>>> and conversions are done correctly.
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> I think we could improve the situation by at least
> adding a
> >>> >> >> >> >>>>>> "milliseconds" constructor to the Python SDK's
> Timestamp.
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> Cheers,
> >>> >> >> >> >>>>>> Max
> >>> >> >> >> >>>>>>
> >>> >> >> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote:
> >>> >> >> >> >>>>>> > I am not so sure this is a good idea. Here are some
> systems and their
> >>> >> >> >> >>>>>> > precision:
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> > Arrow - microseconds
> >>> >> >> >> >>>>>> > BigQuery - microseconds
> >>> >> >> >> >>>>>> > New Java instant - nanoseconds
> >>> >> >> >> >>>>>> > Firestore - microseconds
> >>> >> >> >> >>>>>> > Protobuf - nanoseconds
> >>> >> >> >> >>>>>> > Dataflow backend - microseconds
> >>> >> >> >> >>>>>> > Postgresql - microseconds
> >>> >> >> >> >>>>>> > Pubsub publish time - nanoseconds
> >>> >> >> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime
> about 3 millis)
> >>> >> >> >> >>>>>> > Cassandra - milliseconds
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> > IMO it is important to be able to treat any of these
> as a Beam
> >>> >> >> >> >>>>>> > timestamp, even though they aren't all streaming.
> Who knows when we
> >>> >> >> >> >>>>>> > might be ingesting a streamed changelog, or using
> them for reprocessing
> >>> >> >> >> >>>>>> > an archived stream. I think for this purpose we
> either should
> >>> >> >> >> >>>>>> > standardize on nanoseconds or make the runner's
> resolution independent
> >>> >> >> >> >>>>>> > of the data representation.
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> > I've had some offline conversations about this. I
> think we can have
> >>> >> >> >> >>>>>> > higher-than-runner precision in the user data, and
> allow WindowFns and
> >>> >> >> >> >>>>>> > DoFns to operate on this higher-than-runner
> precision data, and still
> >>> >> >> >> >>>>>> > have consistent watermark treatment. Watermarks are
> just bounds, after all.
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> > Kenn
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <
> [email protected]
> >>> >> >> >> >>>>>> > <mailto:[email protected]>> wrote:
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     The Python SDK currently uses timestamps in
> microsecond resolution
> >>> >> >> >> >>>>>> >     while Java SDK, as most would probably expect,
> uses milliseconds.
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     This causes a few difficulties with portability
> (Python coders need
> >>> >> >> >> >>>>>> >     to convert to millis for WindowedValue and
> Timers, which is related
> >>> >> >> >> >>>>>> >     to a bug I'm looking into:
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-7035
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     As Luke pointed out, the issue was previously
> discussed:
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-1524
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     I'm not privy to the reasons why we decided to
> go with micros in
> >>> >> >> >> >>>>>> >     first place, but would it be too big of a change
> or impractical for
> >>> >> >> >> >>>>>> >     other reasons to switch Python SDK to millis
> before it gets more users?
> >>> >> >> >> >>>>>> >
> >>> >> >> >> >>>>>> >     Thanks,
> >>> >> >> >> >>>>>> >     Thomas
> >>> >> >> >> >>>>>> >
>

Reply via email to