TL;DR - can we solve this by representing aggregations as not point-wise events in time, but time ranges? Explanation below.

Hi,

this is pretty interesting from a theoretical point of view. The question generally seems to be - having two events, can I reliably order them? One event might be end of one window and the other event might be start of another window. There is strictly required causality in this (although these events in fact have the same limiting timestamp). On the other hand, when I have two (random) point-wise events (one with timestamp T1 and the other with timestamp T2), then causality of these two depend on distance in space. If these two events differ by single nanosecond, then if they do not originate very "close" to each other, then there is high probability that different observers will see them in different order (and hence there is no causality possible and the order doesn't matter).

That is to say - if I'm dealing with single external event (someone tells me something has happened at time T), then - around the boundaries of windows - it doesn't matter which window is this event placed into. There is no causal connection between start of window and the event.

Different situation is when we actually perform an aggregation on multiple elements (GBK) - then although we produce output at certain event-time timestamp, this event is not "point-wise external", but is a running aggregation on multiple (and possibly long term) data. That is of course causally preceding opening a new (tumbling) window.

The question now is - could we extend WindowedValue so that it doesn't contain only single point-wise event in time, but a time range in case of aggregations? Then the example of Window.into -> GBK -> Window.into could actually behave correctly (respecting causality) and another positive thing could be, that the resulting timestamp of the aggregation would no longer have to be window.maxTimestamp - 1 (which is kind of strange).

This might be actually equivalent to the "minus epsilon" approach, but maybe better understandable. Moreover, this should be probably available to users, because one can easily get to the same problems when using stateful ParDo to perform a GBK-like aggregation.

Thoughts?

Jan


I see two distinct "objects" for both of which we currently use the same

On 10/30/19 1:05 AM, Robert Bradshaw wrote:
On Tue, Oct 29, 2019 at 4:20 PM Kenneth Knowles <k...@apache.org> wrote:
Point (1) is compelling. Solutions to the "minus epsilon" seem a bit complex. 
On the other hand, an opaque and abstract Timestamp type (in each SDK) going forward 
seems like a Pretty Good Idea (tm). Would you really have to go floating point? Could you 
just have a distinguished representation for non-inclusive upper/lower bounds? These 
could be at the same reduced resolution as timestamps in element metadata, since that is 
all they are compared against.
If I were coming up with an abstract, opaque representation of
Timestamp (and Duration) for Beam, I would explicitly include the
"minus epsilon" concept. One could still do arithmetic with these.
This would make any conversion to standard datetime libraries lossy
though.

Point (2) is also good, though it seems like something that could be cleverly 
engineered and/or we just provide one implementation and it is easy to make 
your own for finer granularity, since a WindowFn separately receives the 
Timestamp (here I'm pretending it is abstract and opaque and likely 
approximate) and the original element with whatever precision the original data 
included.
Yes, but I don't see how a generic WindowFn would reach into the
(arbitrary) element and pull out this original data. One of the
benefits of the Beam model is that the WindowFn does not have to
depend on the element type.

Point (3) the model/runner owns the timestamp metadata so I feel fine about it 
being approximated as long as any original user data is still present. I don't 
recall seeing a compelling case where the timestamp metadata that the runner 
tracks and understands is required to be exactly the same as a user value 
(assuming users understand this distinction, which is another issue that I 
would separate from whether it is technically feasible).
As we provide the ability to designate user data as the runner
timestamp against which to window, and promote the runner timestamp
back to user data (people are going to want to get DateTime or Instant
objects out of it), it seems tricky to explain to users that one or
both of these operations may be lossy (and, in addition, I don't think
there's a consistently safe direction to round).

The more I think about the very real problems you point out, the more I think 
that our backwards-incompatible move should be to our own abstract Timestamp 
type, putting the design decision behind a minimal interface. If we see a 
concrete design for that data type, we might be inspired how to support more 
possibilities.

As for the rest of the speculation... moving to nanos immediately helps users 
so I am now +1 on just doing it, or moving ahead with an abstract data type 
under the assumption that it will basically be nanos under the hood.
If the fact that it's stored as nanos under the hood leaks out (and I
have trouble seeing how it won't) I'd lean towards just using them
directly (e.g. Java Instant) rather than wrapping it.

Having a cleverly resolution-independent system is interesting and maybe 
extremely future proof but maybe preparing for a very distant future that may 
never come.

Kenn

On Fri, Oct 18, 2019 at 11:35 AM Robert Bradshaw <rober...@google.com> wrote:
TL;DR: We should just settle on nanosecond precision ubiquitously for 
timestamp/windowing in Beam.


Re-visiting this discussion in light of cross-language transforms and runners, 
and trying to tighten up testing. I've spent some more time thinking about how 
we could make these operations granularity-agnostic, but just can't find a good 
solution. In particular, the sticklers seem to be:

(1) Windows are half-open intervals, and the timestamp associated with a window coming 
out of a GBK is (by default) as large as possible but must live in that window. 
(Otherwise WindowInto + GBK + WindowInto would have the unforunate effect of moving 
aggregate values into subsequent windows, which is clearly not the intent.) In other 
words, the timestamp of a grouped value is basically End(Window) - epsilon. Unless we 
choose a representation able to encode "minus epsilon" we must agree on a 
granularity.

(2) Unless we want to have multiple vairants of all our WindowFns (e.g. 
FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a 
granularity with which to parameterize these well-known operations. There are 
cases (e.g. side input window mapping, merging) where these Fns may be used 
downstream in contexts other than where they are applied/defined.

(3) Reification of the timestamp into user-visible data, and the other way 
around, require a choice of precision to expose to the user. This means that 
the timestamp is actual data, and truncating/rounding cannot be done 
implicitly. Also round trip of reification and application of timestamps should 
hopefully be idempotent no matter the SDK.

The closest I've come is possibly parameterizing the timestamp type, where encoding, 
decoding (including pulling the end out of a window?), comparison (against each other and 
a watermark), "minus epsilon", etc could be UDFs. Possibly we'd need the full 
set of arithmetic operations to implement FixedWindows on an unknown timestamp type. 
Reification would simply be dis-allowed (or return an opaque rather than SDK-native) type 
if the SDK did not know that window type. The fact that one might need comparison between 
timestamps of different types, or (lossless) coercion from one type to another, means 
that timestamp types need to know about each other, or another entity needs to know about 
the full cross-product, unless there is a common base-type (at which point we might as 
well always choose that).

An intermediate solution is to settle on floating (decimal) point representation, plus a 
"minus-epsiloin" bit. It wouldn't quite solve the mapping through SDK-native 
types (which could require rounding or errors or a new opaque type, and few date librarys 
could faithfully expose the minus epsilon part). It might also be more expensive (compute 
and storage), and would not allow us to use the protofuf timestamp/duration fields (or 
any standard date/time libraries).

Unless we can come up with a clean solution to the issues above shortly, I 
think we should fix a precision and move forward. If this makes sense to 
everyone, then we can start talking about the specific choice of precision and 
a migration path (possibly only for portability).


For reference, the manipulations we do on timestamps are:

WindowInto: Timestamp -> Window
TimestampCombine: Window, [Timestamp] -> Timestamp
     End(Window)
     Min(Timestamps)
     Max(Timestamps)
PastEndOfWindow: Watermark, Window -> {True, False}

[SideInput]WindowMappingFn: Window -> Window
     WindowInto(End(Window))

GetTimestamp: Timestamp -> SDK Native Object
EmitAtTimestamp: SDK Native Object -> Timestamp






On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <rober...@google.com> wrote:
On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <k...@apache.org> wrote:

From: Robert Bradshaw <rober...@google.com>
Date: Wed, May 8, 2019 at 3:00 PM
To: dev

From: Kenneth Knowles <k...@apache.org>
Date: Wed, May 8, 2019 at 6:50 PM
To: dev

The end-of-window, for firing, can be approximate, but it seems it
should be exact for timestamp assignment of the result (and similarly
with the other timestamp combiners).
I was thinking that the window itself should be stored as exact data, while 
just the firing itself is approximated, since it already is, because of 
watermarks and timers.
I think this works where we can compare encoded windows, but some
portable interpretation of windows is required for runner-side
implementation of merging windows (for example).
But in this case, you've recognized the URN of the WindowFn anyhow, so you understand its 
windows. Remembering that IntervalWindow is just one choice, and that windows themselves 
are totally user-defined and that merging logic is completely arbitrary per WindowFn (we 
probably should have some restrictions, but see 
https://issues.apache.org/jira/browse/BEAM-654). So I file this use case in the 
"runner knows everything about the WindowFn and Window type and window encoding 
anyhow".
Being able to merge common windows in the runner is just an
optimization, but an important one (especially for bootstrapping
SDKs). However, this is not just about runner to SDK, but SDK to SDK
as well (where a user from one SDK may want to inspect the windows
produced by another). Having MillisIntervalWindow,
MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
think is worth going down.

Yes, we need to solve the "extract the endpoint of an unknown encoded
window" problem as well, possibly similar to what we do with length
prefix coders, possibly a restriction on window encodings themselves.

There may also be issues if windows (or timestamps) are assigned to a
high precision in one SDK, then inspected/acted on in another SDK, and
then passed back to the original SDK where the truncation would be
visible.
This is pretty interesting and complex. But again, a window is just data. An 
SDK has to know how to deserialize it to operate on it. Unless we do actually 
standardize some aspects of it. I don't believe BoundedWindow encoding has a 
defined way to get the timestamp without decoding the window, does it? I 
thought we had basically default to all InternalWindows. But I am not following 
that closely.

You raise a good point that min/max timestamp combiners require actually understanding 
the higher-precision timestamp. I can think of a couple things to do. One is the old 
"standardize all 3 or for precisions we need" and the other is that combiners 
other than EOW exist primarily to hold the watermark, and that hold does not require the 
original precision. Still, neither of these is that satisfying.
In the current model, the output timestamp is user-visible.
But as long as the watermark hold is less, it is safe. It requires knowing the 
coarse-precision lower bound of the timestamps of the input. And there may be 
situations where you also want the coarse upper bound. But you do know that 
these are at most one millisecond apart (assuming the runner is in millis) so 
perhaps no storage overhead. But a lot of complexity and chances for off by 
ones. And this is pretty hand-wavy.
Yeah. A different SDK may (implicitly or explicitly) ask for the
timestamp of the (transitive) output of a GBK, for which an
approximation (either way) is undesirable.

A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown as 
proto (int64 seconds since epoch + int32 nanos within second). It has legacy 
classes that use milliseconds, and Joda itself now encourages moving back to 
Java's new Instant type. Nanoseconds should complicate the arithmetic only for 
the one person authoring the date library, which they have already done.
The encoding and decoding need to be done in a language-consistent way
as well.
I honestly am not sure what you mean by "language-consistent" here.
If we want to make reading and writing of timestamps, windows
cross-language, we can't rely on language-specific libraries to do the
encoding.

Also, most date libraries don't division, etc. operators, so
we have to do that as well. Not that it should be *that* hard.
If the libraries dedicated to time handling haven't found it needful, is there 
a specific reason you raise this? We do some simple math to find the window 
things fall into; is that it?
Yes. E.g.

https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77

would be a lot messier if there were no mapping date libraries to raw
ints that we can do arithmetic on. Writing this with the (seconds,
nanos) representation is painful. But I suppose we'd only have to do
it once per SDK.

Yea I think that arithmetic is not so bad. But this raises the issue of writing 
a *generic* WindowFn where its idea of timestamp granularity (the WindowFn owns 
the window type and encoding) may not match the user data coming in. So you 
need to apply the approximation function to provide type-correct input to the 
WindowFn. That's kind of exciting and weird and perhaps unsolvable, except by 
choosing a concrete granularity.

Kenn


It would also be really nice to clean up the infinite-future being the
somewhat arbitrary max micros rounded to millis, and
end-of-global-window being infinite-future minus 1 hour (IIRC), etc.
as well as the ugly logic in Python to cope with millis-micros
conversion.
I actually don't have a problem with this. If you are trying to keep the 
representation compact, not add bytes on top of instants, then you just have to 
choose magic numbers, right?
It's not about compactness, it's the (historically-derived?)
arbitrariness of the numbers.
What I mean is that the only reason to fit them into an integer at all is 
compactness. Otherwise, you could use a proper disjoint union representing your 
intent directly, and all fiddling goes away, like `Timestamp ::= PosInf | 
NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of bits.
The other cost is not being able to use standard libraries to
represent all of your timestamps.

For example, the bounds are chosen to
fit within 64-bit mircos despite milliseconds being the "chosen"
granularity, and care was taken that

     WindowInto(Global) | GBK | WindowInto(Minute) | GBK

works, but

     WindowInto(Global) | GBK | WindowInto(Day) | GBK

may produce elements with timestamps greater than MaxTimestamp.

Kenn

[1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html

On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <rob...@frantil.com> wrote:
+1 for plan B. Nano second precision on windowing seems... a little much for a 
system that's aggregating data over time. Even for processing say particle 
super collider data, they'd get away with artificially increasing the 
granularity in batch settings.

Now if they were streaming... they'd probably want femtoseconds anyway.
The point is, we should see if users demand it before adding in the necessary 
work.

On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <chamik...@google.com> wrote:
+1 for plan B as well. I think it's important to make timestamp precision 
consistent now without introducing surprising behaviors for existing users. But 
we should move towards a higher granularity timestamp precision in the long run 
to support use-cases that Beam users otherwise might miss out (on a runner that 
supports such precision).

- Cham

On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <lc...@google.com> wrote:
I also like Plan B because in the cross language case, the pipeline would not work 
since every party (Runners & SDKs) would have to be aware of the new 
beam:coder:windowed_value:v2 coder. Plan A has the property where if the SDK/Runner 
wasn't updated then it may start truncating the timestamps unexpectedly.

On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <lc...@google.com> wrote:
Kenn, this discussion is about the precision of the timestamp in the user data. 
As you had mentioned, Runners need not have the same granularity of user data 
as long as they correctly round the timestamp to guarantee that triggers are 
executed correctly but the user data should have the same precision across SDKs 
otherwise user data timestamps will be truncated in cross language scenarios.

Based on the systems that were listed, either microsecond or nanosecond would 
make sense. The issue with changing the precision is that all Beam runners 
except for possibly Beam Python on Dataflow are using millisecond precision 
since they are all using the same Java Runner windowing/trigger logic.

Plan A: Swap precision to nanosecond
1) Change the Python SDK to only expose millisecond precision timestamps (do 
now)
2) Change the user data encoding to support nanosecond precision (do now)
3) Swap runner libraries to be nanosecond precision aware updating all 
window/triggering logic (do later)
4) Swap SDKs to expose nanosecond precision (do later)

Plan B:
1) Change the Python SDK to only expose millisecond precision timestamps and 
keep the data encoding as is (do now)
(We could add greater precision later to plan B by creating a new version 
beam:coder:windowed_value:v2 which would be nanosecond and would require 
runners to correctly perform an internal conversions for windowing/triggering.)

I think we should go with Plan B and when users request greater precision we 
can make that an explicit effort. What do people think?



On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <m...@apache.org> wrote:
Hi,

Thanks for taking care of this issue in the Python SDK, Thomas!

It would be nice to have a uniform precision for timestamps but, as Kenn
pointed out, timestamps are extracted from systems that have different
precision.

To add to the list: Flink - milliseconds

After all, it doesn't matter as long as there is sufficient precision
and conversions are done correctly.

I think we could improve the situation by at least adding a
"milliseconds" constructor to the Python SDK's Timestamp.

Cheers,
Max

On 17.04.19 04:13, Kenneth Knowles wrote:
I am not so sure this is a good idea. Here are some systems and their
precision:

Arrow - microseconds
BigQuery - microseconds
New Java instant - nanoseconds
Firestore - microseconds
Protobuf - nanoseconds
Dataflow backend - microseconds
Postgresql - microseconds
Pubsub publish time - nanoseconds
MSSQL datetime2 - 100 nanoseconds (original datetime about 3 millis)
Cassandra - milliseconds

IMO it is important to be able to treat any of these as a Beam
timestamp, even though they aren't all streaming. Who knows when we
might be ingesting a streamed changelog, or using them for reprocessing
an archived stream. I think for this purpose we either should
standardize on nanoseconds or make the runner's resolution independent
of the data representation.

I've had some offline conversations about this. I think we can have
higher-than-runner precision in the user data, and allow WindowFns and
DoFns to operate on this higher-than-runner precision data, and still
have consistent watermark treatment. Watermarks are just bounds, after all.

Kenn

On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <t...@apache.org
<mailto:t...@apache.org>> wrote:

     The Python SDK currently uses timestamps in microsecond resolution
     while Java SDK, as most would probably expect, uses milliseconds.

     This causes a few difficulties with portability (Python coders need
     to convert to millis for WindowedValue and Timers, which is related
     to a bug I'm looking into:

     https://issues.apache.org/jira/browse/BEAM-7035

     As Luke pointed out, the issue was previously discussed:

     https://issues.apache.org/jira/browse/BEAM-1524

     I'm not privy to the reasons why we decided to go with micros in
     first place, but would it be too big of a change or impractical for
     other reasons to switch Python SDK to millis before it gets more users?

     Thanks,
     Thomas

Reply via email to