TL;DR: We should just settle on nanosecond precision ubiquitously for
timestamp/windowing in Beam.
Re-visiting this discussion in light of cross-language transforms and runners,
and trying to tighten up testing. I've spent some more time thinking about how
we could make these operations granularity-agnostic, but just can't find a good
solution. In particular, the sticklers seem to be:
(1) Windows are half-open intervals, and the timestamp associated with a window coming
out of a GBK is (by default) as large as possible but must live in that window.
(Otherwise WindowInto + GBK + WindowInto would have the unforunate effect of moving
aggregate values into subsequent windows, which is clearly not the intent.) In other
words, the timestamp of a grouped value is basically End(Window) - epsilon. Unless we
choose a representation able to encode "minus epsilon" we must agree on a
granularity.
(2) Unless we want to have multiple vairants of all our WindowFns (e.g.
FixedWindowMillis, FixedWindowMicros, FixedWindowNanos) we must agree on a
granularity with which to parameterize these well-known operations. There are
cases (e.g. side input window mapping, merging) where these Fns may be used
downstream in contexts other than where they are applied/defined.
(3) Reification of the timestamp into user-visible data, and the other way
around, require a choice of precision to expose to the user. This means that
the timestamp is actual data, and truncating/rounding cannot be done
implicitly. Also round trip of reification and application of timestamps should
hopefully be idempotent no matter the SDK.
The closest I've come is possibly parameterizing the timestamp type, where encoding,
decoding (including pulling the end out of a window?), comparison (against each other and
a watermark), "minus epsilon", etc could be UDFs. Possibly we'd need the full
set of arithmetic operations to implement FixedWindows on an unknown timestamp type.
Reification would simply be dis-allowed (or return an opaque rather than SDK-native) type
if the SDK did not know that window type. The fact that one might need comparison between
timestamps of different types, or (lossless) coercion from one type to another, means
that timestamp types need to know about each other, or another entity needs to know about
the full cross-product, unless there is a common base-type (at which point we might as
well always choose that).
An intermediate solution is to settle on floating (decimal) point representation, plus a
"minus-epsiloin" bit. It wouldn't quite solve the mapping through SDK-native
types (which could require rounding or errors or a new opaque type, and few date librarys
could faithfully expose the minus epsilon part). It might also be more expensive (compute
and storage), and would not allow us to use the protofuf timestamp/duration fields (or
any standard date/time libraries).
Unless we can come up with a clean solution to the issues above shortly, I
think we should fix a precision and move forward. If this makes sense to
everyone, then we can start talking about the specific choice of precision and
a migration path (possibly only for portability).
For reference, the manipulations we do on timestamps are:
WindowInto: Timestamp -> Window
TimestampCombine: Window, [Timestamp] -> Timestamp
End(Window)
Min(Timestamps)
Max(Timestamps)
PastEndOfWindow: Watermark, Window -> {True, False}
[SideInput]WindowMappingFn: Window -> Window
WindowInto(End(Window))
GetTimestamp: Timestamp -> SDK Native Object
EmitAtTimestamp: SDK Native Object -> Timestamp
On Fri, May 10, 2019 at 1:33 PM Robert Bradshaw <rober...@google.com> wrote:
On Thu, May 9, 2019 at 9:32 AM PM Kenneth Knowles <k...@apache.org> wrote:
From: Robert Bradshaw <rober...@google.com>
Date: Wed, May 8, 2019 at 3:00 PM
To: dev
From: Kenneth Knowles <k...@apache.org>
Date: Wed, May 8, 2019 at 6:50 PM
To: dev
The end-of-window, for firing, can be approximate, but it seems it
should be exact for timestamp assignment of the result (and similarly
with the other timestamp combiners).
I was thinking that the window itself should be stored as exact data, while
just the firing itself is approximated, since it already is, because of
watermarks and timers.
I think this works where we can compare encoded windows, but some
portable interpretation of windows is required for runner-side
implementation of merging windows (for example).
But in this case, you've recognized the URN of the WindowFn anyhow, so you understand its
windows. Remembering that IntervalWindow is just one choice, and that windows themselves
are totally user-defined and that merging logic is completely arbitrary per WindowFn (we
probably should have some restrictions, but see
https://issues.apache.org/jira/browse/BEAM-654). So I file this use case in the
"runner knows everything about the WindowFn and Window type and window encoding
anyhow".
Being able to merge common windows in the runner is just an
optimization, but an important one (especially for bootstrapping
SDKs). However, this is not just about runner to SDK, but SDK to SDK
as well (where a user from one SDK may want to inspect the windows
produced by another). Having MillisIntervalWindow,
MicrosIntervalWindow, NanosIntervalWindow, etc. isn't a path that I
think is worth going down.
Yes, we need to solve the "extract the endpoint of an unknown encoded
window" problem as well, possibly similar to what we do with length
prefix coders, possibly a restriction on window encodings themselves.
There may also be issues if windows (or timestamps) are assigned to a
high precision in one SDK, then inspected/acted on in another SDK, and
then passed back to the original SDK where the truncation would be
visible.
This is pretty interesting and complex. But again, a window is just data. An
SDK has to know how to deserialize it to operate on it. Unless we do actually
standardize some aspects of it. I don't believe BoundedWindow encoding has a
defined way to get the timestamp without decoding the window, does it? I
thought we had basically default to all InternalWindows. But I am not following
that closely.
You raise a good point that min/max timestamp combiners require actually understanding
the higher-precision timestamp. I can think of a couple things to do. One is the old
"standardize all 3 or for precisions we need" and the other is that combiners
other than EOW exist primarily to hold the watermark, and that hold does not require the
original precision. Still, neither of these is that satisfying.
In the current model, the output timestamp is user-visible.
But as long as the watermark hold is less, it is safe. It requires knowing the
coarse-precision lower bound of the timestamps of the input. And there may be
situations where you also want the coarse upper bound. But you do know that
these are at most one millisecond apart (assuming the runner is in millis) so
perhaps no storage overhead. But a lot of complexity and chances for off by
ones. And this is pretty hand-wavy.
Yeah. A different SDK may (implicitly or explicitly) ask for the
timestamp of the (transitive) output of a GBK, for which an
approximation (either way) is undesirable.
A correction: Java *now* uses nanoseconds [1]. It uses the same breakdown as
proto (int64 seconds since epoch + int32 nanos within second). It has legacy
classes that use milliseconds, and Joda itself now encourages moving back to
Java's new Instant type. Nanoseconds should complicate the arithmetic only for
the one person authoring the date library, which they have already done.
The encoding and decoding need to be done in a language-consistent way
as well.
I honestly am not sure what you mean by "language-consistent" here.
If we want to make reading and writing of timestamps, windows
cross-language, we can't rely on language-specific libraries to do the
encoding.
Also, most date libraries don't division, etc. operators, so
we have to do that as well. Not that it should be *that* hard.
If the libraries dedicated to time handling haven't found it needful, is there
a specific reason you raise this? We do some simple math to find the window
things fall into; is that it?
Yes. E.g.
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java#L77
would be a lot messier if there were no mapping date libraries to raw
ints that we can do arithmetic on. Writing this with the (seconds,
nanos) representation is painful. But I suppose we'd only have to do
it once per SDK.
Yea I think that arithmetic is not so bad. But this raises the issue of writing
a *generic* WindowFn where its idea of timestamp granularity (the WindowFn owns
the window type and encoding) may not match the user data coming in. So you
need to apply the approximation function to provide type-correct input to the
WindowFn. That's kind of exciting and weird and perhaps unsolvable, except by
choosing a concrete granularity.
Kenn
It would also be really nice to clean up the infinite-future being the
somewhat arbitrary max micros rounded to millis, and
end-of-global-window being infinite-future minus 1 hour (IIRC), etc.
as well as the ugly logic in Python to cope with millis-micros
conversion.
I actually don't have a problem with this. If you are trying to keep the
representation compact, not add bytes on top of instants, then you just have to
choose magic numbers, right?
It's not about compactness, it's the (historically-derived?)
arbitrariness of the numbers.
What I mean is that the only reason to fit them into an integer at all is
compactness. Otherwise, you could use a proper disjoint union representing your
intent directly, and all fiddling goes away, like `Timestamp ::= PosInf |
NegInf | EndOfGlobalWindow | ActualTime(Instant)`. It costs a couple of bits.
The other cost is not being able to use standard libraries to
represent all of your timestamps.
For example, the bounds are chosen to
fit within 64-bit mircos despite milliseconds being the "chosen"
granularity, and care was taken that
WindowInto(Global) | GBK | WindowInto(Minute) | GBK
works, but
WindowInto(Global) | GBK | WindowInto(Day) | GBK
may produce elements with timestamps greater than MaxTimestamp.
Kenn
[1] https://docs.oracle.com/javase/8/docs/api/java/time/Instant.html
On Wed, Apr 17, 2019 at 3:13 PM Robert Burke <rob...@frantil.com> wrote:
+1 for plan B. Nano second precision on windowing seems... a little much for a
system that's aggregating data over time. Even for processing say particle
super collider data, they'd get away with artificially increasing the
granularity in batch settings.
Now if they were streaming... they'd probably want femtoseconds anyway.
The point is, we should see if users demand it before adding in the necessary
work.
On Wed, 17 Apr 2019 at 14:26, Chamikara Jayalath <chamik...@google.com> wrote:
+1 for plan B as well. I think it's important to make timestamp precision
consistent now without introducing surprising behaviors for existing users. But
we should move towards a higher granularity timestamp precision in the long run
to support use-cases that Beam users otherwise might miss out (on a runner that
supports such precision).
- Cham
On Wed, Apr 17, 2019 at 1:35 PM Lukasz Cwik <lc...@google.com> wrote:
I also like Plan B because in the cross language case, the pipeline would not work
since every party (Runners & SDKs) would have to be aware of the new
beam:coder:windowed_value:v2 coder. Plan A has the property where if the SDK/Runner
wasn't updated then it may start truncating the timestamps unexpectedly.
On Wed, Apr 17, 2019 at 1:24 PM Lukasz Cwik <lc...@google.com> wrote:
Kenn, this discussion is about the precision of the timestamp in the user data.
As you had mentioned, Runners need not have the same granularity of user data
as long as they correctly round the timestamp to guarantee that triggers are
executed correctly but the user data should have the same precision across SDKs
otherwise user data timestamps will be truncated in cross language scenarios.
Based on the systems that were listed, either microsecond or nanosecond would
make sense. The issue with changing the precision is that all Beam runners
except for possibly Beam Python on Dataflow are using millisecond precision
since they are all using the same Java Runner windowing/trigger logic.
Plan A: Swap precision to nanosecond
1) Change the Python SDK to only expose millisecond precision timestamps (do
now)
2) Change the user data encoding to support nanosecond precision (do now)
3) Swap runner libraries to be nanosecond precision aware updating all
window/triggering logic (do later)
4) Swap SDKs to expose nanosecond precision (do later)
Plan B:
1) Change the Python SDK to only expose millisecond precision timestamps and
keep the data encoding as is (do now)
(We could add greater precision later to plan B by creating a new version
beam:coder:windowed_value:v2 which would be nanosecond and would require
runners to correctly perform an internal conversions for windowing/triggering.)
I think we should go with Plan B and when users request greater precision we
can make that an explicit effort. What do people think?
On Wed, Apr 17, 2019 at 5:43 AM Maximilian Michels <m...@apache.org> wrote:
Hi,
Thanks for taking care of this issue in the Python SDK, Thomas!
It would be nice to have a uniform precision for timestamps but, as Kenn
pointed out, timestamps are extracted from systems that have different
precision.
To add to the list: Flink - milliseconds
After all, it doesn't matter as long as there is sufficient precision
and conversions are done correctly.
I think we could improve the situation by at least adding a
"milliseconds" constructor to the Python SDK's Timestamp.
Cheers,
Max
On 17.04.19 04:13, Kenneth Knowles wrote:
I am not so sure this is a good idea. Here are some systems and their
precision:
Arrow - microseconds
BigQuery - microseconds
New Java instant - nanoseconds
Firestore - microseconds
Protobuf - nanoseconds
Dataflow backend - microseconds
Postgresql - microseconds
Pubsub publish time - nanoseconds
MSSQL datetime2 - 100 nanoseconds (original datetime about 3 millis)
Cassandra - milliseconds
IMO it is important to be able to treat any of these as a Beam
timestamp, even though they aren't all streaming. Who knows when we
might be ingesting a streamed changelog, or using them for reprocessing
an archived stream. I think for this purpose we either should
standardize on nanoseconds or make the runner's resolution independent
of the data representation.
I've had some offline conversations about this. I think we can have
higher-than-runner precision in the user data, and allow WindowFns and
DoFns to operate on this higher-than-runner precision data, and still
have consistent watermark treatment. Watermarks are just bounds, after all.
Kenn
On Tue, Apr 16, 2019 at 6:48 PM Thomas Weise <t...@apache.org
<mailto:t...@apache.org>> wrote:
The Python SDK currently uses timestamps in microsecond resolution
while Java SDK, as most would probably expect, uses milliseconds.
This causes a few difficulties with portability (Python coders need
to convert to millis for WindowedValue and Timers, which is related
to a bug I'm looking into:
https://issues.apache.org/jira/browse/BEAM-7035
As Luke pointed out, the issue was previously discussed:
https://issues.apache.org/jira/browse/BEAM-1524
I'm not privy to the reasons why we decided to go with micros in
first place, but would it be too big of a change or impractical for
other reasons to switch Python SDK to millis before it gets more users?
Thanks,
Thomas