Robert it seems like your for Plan A. Assuming we go forward with
nanosecond and based upon your analysis in 3), wouldn't that mean we would
have to make a breaking change to the Java SDK to swap to nanosecond
precision?


On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <[email protected]>
wrote:

> TL;DR: We should just settle on nanosecond precision ubiquitously for
> timestamp/windowing in Beam.
>
>
> Re-visiting this discussion in light of cross-language transforms and
> runners, and trying to tighten up testing. I've spent some more time
> thinking about how we could make these operations granularity-agnostic, but
> just can't find a good solution. In particular, the sticklers seem to be:
>
> (1) Windows are half-open intervals, and the timestamp associated with a
> window coming out of a GBK is (by default) as large as possible but must
> live in that window. (Otherwise WindowInto + GBK + WindowInto would have
> the unforunate effect of moving aggregate values into subsequent windows,
> which is clearly not the intent.) In other words, the timestamp of a
> grouped value is basically End(Window) - epsilon. Unless we choose a
> representation able to encode "minus epsilon" we must agree on a
> granularity.
>
> (2) Unless we want to have multiple vairants of all our WindowFns (e.g.
> FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a
> granularity with which to parameterize these well-known operations. There
> are cases (e.g. side input window mapping, merging) where these Fns may be
> used downstream in contexts other than where they are applied/defined.
>
> (3) Reification of the timestamp into user-visible data, and the other way
> around, require a choice of precision to expose to the user. This means
> that the timestamp is actual data, and truncating/rounding cannot be done
> implicitly. Also round trip of reification and application of timestamps
> should hopefully be idempotent no matter the SDK.
>
> The closest I've come is possibly parameterizing the timestamp type, where
> encoding, decoding (including pulling the end out of a window?), comparison
> (against each other and a watermark), "minus epsilon", etc could be UDFs.
> Possibly we'd need the full set of arithmetic operations to implement
> FixedWindows on an unknown timestamp type. Reification would simply be
> dis-allowed (or return an opaque rather than SDK-native) type if the SDK
> did not know that window type. The fact that one might need comparison
> between timestamps of different types, or (lossless) coercion from one type
> to another, means that timestamp types need to know about each other, or
> another entity needs to know about the full cross-product, unless there is
> a common base-type (at which point we might as well always choose that).
>
> An intermediate solution is to settle on floating (decimal) point
> representation, plus a "minus-epsiloin" bit. It wouldn't quite solve the
> mapping through SDK-native types (which could require rounding or errors or
> a new opaque type, and few date librarys could faithfully expose the minus
> epsilon part). It might also be more expensive (compute and storage), and
> would not allow us to use the protofuf timestamp/duration fields (or any
> standard date/time libraries).
>
> Unless we can come up with a clean solution to the issues above shortly, I
> think we should fix a precision and move forward. If this makes sense to
> everyone, then we can start talking about the specific choice of precision
> and a migration path (possibly only for portability).
>
>
> For reference, the manipulations we do on timestamps are:
>
> WindowInto: Timestamp -> Window
> TimestampCombine: Window, [Timestamp] -> Timestamp
>     End(Window)
>     Min(Timestamps)
>     Max(Timestamps)
> PastEndOfWindow: Watermark, Window -> {True, False}
>
> [SideInput]WindowMappingFn: Window -> Window
>     WindowInto(End(Window))
>
> GetTimestamp: Timestamp -> SDK Native Object
> EmitAtTimestamp: SDK Native Object -> Timestamp
>
>
>
>
>
>
> On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <[email protected]>
>> wrote:
>>
>> > From: Robert Bradshaw <[email protected]>
>> > Date: Wed, May 8, 2019 at 3:00 PM
>> > To: dev
>> >
>> >> From: Kenneth Knowles <[email protected]>
>> >> Date: Wed, May 8, 2019 at 6:50 PM
>> >> To: dev
>> >>
>> >> >> The end-of-window, for firing, can be approximate, but it seems it
>> >> >> should be exact for timestamp assignment of the result (and
>> similarly
>> >> >> with the other timestamp combiners).
>> >> >
>> >> > I was thinking that the window itself should be stored as exact
>> data, while just the firing itself is approximated, since it already is,
>> because of watermarks and timers.
>> >>
>> >> I think this works where we can compare encoded windows, but some
>> >> portable interpretation of windows is required for runner-side
>> >> implementation of merging windows (for example).
>> >
>> > But in this case, you've recognized the URN of the WindowFn anyhow, so
>> you understand its windows. Remembering that IntervalWindow is just one
>> choice, and that windows themselves are totally user-defined and that
>> merging logic is completely arbitrary per WindowFn (we probably should have
>> some restrictions, but see https://issues.apache.org/jira/browse/BEAM-654).
>> So I file this use case in the "runner knows everything about the WindowFn
>> and Window type and window encoding anyhow".
>>
>> Being able to merge common windows in the runner is just an
>> optimization, but an important one (especially for bootstrapping
>> SDKs). However, this is not just about runner to SDK, but SDK to SDK
>> as well (where a user from one SDK may want to inspect the windows
>> produced by another). Having MillisIntervalWindow,
>> MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
>> think is worth going down.
>>
>> Yes, we need to solve the "extract the endpoint of an unknown encoded
>> window" problem as well, possibly similar to what we do with length
>> prefix coders, possibly a restriction on window encodings themselves.
>>
>> >> There may also be issues if windows (or timestamps) are assigned to a
>> >> high precision in one SDK, then inspected/acted on in another SDK, and
>> >> then passed back to the original SDK where the truncation would be
>> >> visible.
>> >
>> > This is pretty interesting and complex. But again, a window is just
>> data. An SDK has to know how to deserialize it to operate on it. Unless we
>> do actually standardize some aspects of it. I don't believe BoundedWindow
>> encoding has a defined way to get the timestamp without decoding the
>> window, does it? I thought we had basically default to all InternalWindows.
>> But I am not following that closely.
>> >
>> >> > You raise a good point that min/max timestamp combiners require
>> actually understanding the higher-precision timestamp. I can think of a
>> couple things to do. One is the old "standardize all 3 or for precisions we
>> need" and the other is that combiners other than EOW exist primarily to
>> hold the watermark, and that hold does not require the original precision.
>> Still, neither of these is that satisfying.
>> >>
>> >> In the current model, the output timestamp is user-visible.
>> >
>> > But as long as the watermark hold is less, it is safe. It requires
>> knowing the coarse-precision lower bound of the timestamps of the input.
>> And there may be situations where you also want the coarse upper bound. But
>> you do know that these are at most one millisecond apart (assuming the
>> runner is in millis) so perhaps no storage overhead. But a lot of
>> complexity and chances for off by ones. And this is pretty hand-wavy.
>>
>> Yeah. A different SDK may (implicitly or explicitly) ask for the
>> timestamp of the (transitive) output of a GBK, for which an
>> approximation (either way) is undesirable.
>>
>> >> >> > A correction: Java *now* uses nanoseconds [1]. It uses the same
>> breakdown as proto (int64 seconds since epoch + int32 nanos within second).
>> It has legacy classes that use milliseconds, and Joda itself now encourages
>> moving back to Java's new Instant type. Nanoseconds should complicate the
>> arithmetic only for the one person authoring the date library, which they
>> have already done.
>> >> >>
>> >> >> The encoding and decoding need to be done in a language-consistent
>> way
>> >> >> as well.
>> >> >
>> >> > I honestly am not sure what you mean by "language-consistent" here.
>> >>
>> >> If we want to make reading and writing of timestamps, windows
>> >> cross-language, we can't rely on language-specific libraries to do the
>> >> encoding.
>> >>
>> >> >> Also, most date libraries don't division, etc. operators, so
>> >> >> we have to do that as well. Not that it should be *that* hard.
>> >> >
>> >> > If the libraries dedicated to time handling haven't found it
>> needful, is there a specific reason you raise this? We do some simple math
>> to find the window things fall into; is that it?
>> >>
>> >> Yes. E.g.
>> >>
>> >>
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
>> >>
>> >> would be a lot messier if there were no mapping date libraries to raw
>> >> ints that we can do arithmetic on. Writing this with the (seconds,
>> >> nanos) representation is painful. But I suppose we'd only have to do
>> >> it once per SDK.
>> >
>> >
>> > Yea I think that arithmetic is not so bad. But this raises the issue of
>> writing a *generic* WindowFn where its idea of timestamp granularity (the
>> WindowFn owns the window type and encoding) may not match the user data
>> coming in. So you need to apply the approximation function to provide
>> type-correct input to the WindowFn. That's kind of exciting and weird and
>> perhaps unsolvable, except by choosing a concrete granularity.
>> >
>> > Kenn
>> >
>> >>
>> >>
>> >> >> >> It would also be really nice to clean up the infinite-future
>> being the
>> >> >> >> somewhat arbitrary max micros rounded to millis, and
>> >> >> >> end-of-global-window being infinite-future minus 1 hour (IIRC),
>> etc.
>> >> >> >> as well as the ugly logic in Python to cope with millis-micros
>> >> >> >> conversion.
>> >> >> >
>> >> >> > I actually don't have a problem with this. If you are trying to
>> keep the representation compact, not add bytes on top of instants, then you
>> just have to choose magic numbers, right?
>> >> >>
>> >> >> It's not about compactness, it's the (historically-derived?)
>> >> >> arbitrariness of the numbers.
>> >> >
>> >> > What I mean is that the only reason to fit them into an integer at
>> all is compactness. Otherwise, you could use a proper disjoint union
>> representing your intent directly, and all fiddling goes away, like
>> `Timestamp ::= PosInf | NegInf | EndOfGlobalWindow | ActualTime(Instant)`.
>> It costs a couple of bits.
>> >>
>> >> The other cost is not being able to use standard libraries to
>> >> represent all of your timestamps.
>> >>
>> >> >> For example, the bounds are chosen to
>> >> >> fit within 64-bit mircos despite milliseconds being the "chosen"
>> >> >> granularity, and care was taken that
>> >> >>
>> >> >>     WindowInto(Global) | GBK | WindowInto(Minute) | GBK
>> >> >>
>> >> >> works, but
>> >> >>
>> >> >>     WindowInto(Global) | GBK | WindowInto(Day) | GBK
>> >> >>
>> >> >> may produce elements with timestamps greater than MaxTimestamp.
>> >> >>
>> >> >> >
>> >> >> > Kenn
>> >> >> >
>> >> >> > [1]
>> https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
>> >> >> >
>> >> >> >>
>> >> >> >> > On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <
>> [email protected]> wrote:
>> >> >> >> >>
>> >> >> >> >> +1 for plan B. Nano second precision on windowing seems... a
>> little much for a system that's aggregating data over time. Even for
>> processing say particle super collider data, they'd get away with
>> artificially increasing the granularity in batch settings.
>> >> >> >> >>
>> >> >> >> >> Now if they were streaming... they'd probably want
>> femtoseconds anyway.
>> >> >> >> >> The point is, we should see if users demand it before adding
>> in the necessary work.
>> >> >> >> >>
>> >> >> >> >> On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <
>> [email protected]> wrote:
>> >> >> >> >>>
>> >> >> >> >>> +1 for plan B as well. I think it's important to make
>> timestamp precision consistent now without introducing surprising behaviors
>> for existing users. But we should move towards a higher granularity
>> timestamp precision in the long run to support use-cases that Beam users
>> otherwise might miss out (on a runner that supports such precision).
>> >> >> >> >>>
>> >> >> >> >>> - Cham
>> >> >> >> >>>
>> >> >> >> >>> On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <
>> [email protected]> wrote:
>> >> >> >> >>>>
>> >> >> >> >>>> I also like Plan B because in the cross language case, the
>> pipeline would not work since every party (Runners & SDKs) would have to be
>> aware of the new beam:coder:windowed_value:v2 coder. Plan A has the
>> property where if the SDK/Runner wasn't updated then it may start
>> truncating the timestamps unexpectedly.
>> >> >> >> >>>>
>> >> >> >> >>>> On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <
>> [email protected]> wrote:
>> >> >> >> >>>>>
>> >> >> >> >>>>> Kenn, this discussion is about the precision of the
>> timestamp in the user data. As you had mentioned, Runners need not have the
>> same granularity of user data as long as they correctly round the timestamp
>> to guarantee that triggers are executed correctly but the user data should
>> have the same precision across SDKs otherwise user data timestamps will be
>> truncated in cross language scenarios.
>> >> >> >> >>>>>
>> >> >> >> >>>>> Based on the systems that were listed, either microsecond
>> or nanosecond would make sense. The issue with changing the precision is
>> that all Beam runners except for possibly Beam Python on Dataflow are using
>> millisecond precision since they are all using the same Java Runner
>> windowing/trigger logic.
>> >> >> >> >>>>>
>> >> >> >> >>>>> Plan A: Swap precision to nanosecond
>> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond
>> precision timestamps (do now)
>> >> >> >> >>>>> 2) Change the user data encoding to support nanosecond
>> precision (do now)
>> >> >> >> >>>>> 3) Swap runner libraries to be nanosecond precision aware
>> updating all window/triggering logic (do later)
>> >> >> >> >>>>> 4) Swap SDKs to expose nanosecond precision (do later)
>> >> >> >> >>>>>
>> >> >> >> >>>>> Plan B:
>> >> >> >> >>>>> 1) Change the Python SDK to only expose millisecond
>> precision timestamps and keep the data encoding as is (do now)
>> >> >> >> >>>>> (We could add greater precision later to plan B by
>> creating a new version beam:coder:windowed_value:v2 which would be
>> nanosecond and would require runners to correctly perform an internal
>> conversions for windowing/triggering.)
>> >> >> >> >>>>>
>> >> >> >> >>>>> I think we should go with Plan B and when users request
>> greater precision we can make that an explicit effort. What do people think?
>> >> >> >> >>>>>
>> >> >> >> >>>>>
>> >> >> >> >>>>>
>> >> >> >> >>>>> On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <
>> [email protected]> wrote:
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> Hi,
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> Thanks for taking care of this issue in the Python SDK,
>> Thomas!
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> It would be nice to have a uniform precision for
>> timestamps but, as Kenn
>> >> >> >> >>>>>> pointed out, timestamps are extracted from systems that
>> have different
>> >> >> >> >>>>>> precision.
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> To add to the list: Flink - milliseconds
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> After all, it doesn't matter as long as there is
>> sufficient precision
>> >> >> >> >>>>>> and conversions are done correctly.
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> I think we could improve the situation by at least adding
>> a
>> >> >> >> >>>>>> "milliseconds" constructor to the Python SDK's Timestamp.
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> Cheers,
>> >> >> >> >>>>>> Max
>> >> >> >> >>>>>>
>> >> >> >> >>>>>> On 17.04.19 04:13, Kenneth Knowles wrote:
>> >> >> >> >>>>>> > I am not so sure this is a good idea. Here are some
>> systems and their
>> >> >> >> >>>>>> > precision:
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> > Arrow - microseconds
>> >> >> >> >>>>>> > BigQuery - microseconds
>> >> >> >> >>>>>> > New Java instant - nanoseconds
>> >> >> >> >>>>>> > Firestore - microseconds
>> >> >> >> >>>>>> > Protobuf - nanoseconds
>> >> >> >> >>>>>> > Dataflow backend - microseconds
>> >> >> >> >>>>>> > Postgresql - microseconds
>> >> >> >> >>>>>> > Pubsub publish time - nanoseconds
>> >> >> >> >>>>>> > MSSQL datetime2 - 100 nanoseconds (original datetime
>> about 3 millis)
>> >> >> >> >>>>>> > Cassandra - milliseconds
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> > IMO it is important to be able to treat any of these as
>> a Beam
>> >> >> >> >>>>>> > timestamp, even though they aren't all streaming. Who
>> knows when we
>> >> >> >> >>>>>> > might be ingesting a streamed changelog, or using them
>> for reprocessing
>> >> >> >> >>>>>> > an archived stream. I think for this purpose we either
>> should
>> >> >> >> >>>>>> > standardize on nanoseconds or make the runner's
>> resolution independent
>> >> >> >> >>>>>> > of the data representation.
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> > I've had some offline conversations about this. I think
>> we can have
>> >> >> >> >>>>>> > higher-than-runner precision in the user data, and
>> allow WindowFns and
>> >> >> >> >>>>>> > DoFns to operate on this higher-than-runner precision
>> data, and still
>> >> >> >> >>>>>> > have consistent watermark treatment. Watermarks are
>> just bounds, after all.
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> > Kenn
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> > On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <
>> [email protected]
>> >> >> >> >>>>>> > <mailto:[email protected]>> wrote:
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     The Python SDK currently uses timestamps in
>> microsecond resolution
>> >> >> >> >>>>>> >     while Java SDK, as most would probably expect, uses
>> milliseconds.
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     This causes a few difficulties with portability
>> (Python coders need
>> >> >> >> >>>>>> >     to convert to millis for WindowedValue and Timers,
>> which is related
>> >> >> >> >>>>>> >     to a bug I'm looking into:
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-7035
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     As Luke pointed out, the issue was previously
>> discussed:
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     https://issues.apache.org/jira/browse/BEAM-1524
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     I'm not privy to the reasons why we decided to go
>> with micros in
>> >> >> >> >>>>>> >     first place, but would it be too big of a change or
>> impractical for
>> >> >> >> >>>>>> >     other reasons to switch Python SDK to millis before
>> it gets more users?
>> >> >> >> >>>>>> >
>> >> >> >> >>>>>> >     Thanks,
>> >> >> >> >>>>>> >     Thomas
>> >> >> >> >>>>>> >
>>
>

Reply via email to