On Fri, May 5, 2017 at 12:14 PM, Kenneth Knowles <k...@google.com.invalid> wrote: > Before going into the details, I want to set the context (my own > perspective) that the most important thing for the first stable release is > removing possibly dangerous or incomplete pieces, which we can always > restore backwards compatibly after deliberation. > > The original spec is this: > > "If invoked from startBundle or finishBundle, this will attempt to use the > WindowFn of the input PCollection to determine what windows the element > should be in, throwing an exception if the WindowFn attempts to access any > information about the input element. The output element will have a > timestamp of negative infinity." > > First, I want to call out a mistake of mine: I mistakenly read the spec and > code to be that the timestamp of negative infinity was run through the > WindowFn, which would constitute data corruption for most WindowFns. > > So, with my corrected understanding, your design where we basically say "it > always outputs to the global window or something like it" is a totally > reasonable alternative to the current status. I'm not the only one with > opinions, here, of course... > > In Java, we _can_ now enforce this statically, thanks to Thomas G's > improvements. We could either force it to be globally windowed or - since > it can't inspect the element or timestamp - just run a couple test calls > through "assignWindows", optionally confirming equality of the results.
Could you clarify more? I'd like to enforce this statically, but I don't see how to detect a particular DoFn emits a globally windowed value. But that make things much better. > We > can also enforce that the correct subclass of window is output, though this > is not implemented as such right now. Are you thinking a type check based on the reflectively obtained parameter of the WindowFn? > But this behavior is all to support people who want to write transforms > that work with essentially only one windowing strategy, basically > pre-windowing stuff. Are there really any meaningful alternatives to the > global window? If so, I don't think that is realistically what this is > about. Such transforms should be permitted, but not encouraged or supported > automatically, as they are antithetical to the unified model. So I would > also suggest that they should validate that their input is globally > windowed and explicitly output to the global window. Trivial and they > already should be doing that. They also have the capability of storing the > input's WindowFn and producing behavior identical to what you describe. What it boils down to is that this is a common case that only works easily in the Global window. Another example that I can think of like this in the SDK, namely global combine. If you're in the global window, you get a singleton PCollection, and we don't require users to do extra for this behavior. If someone tries to use this transform in a non-globally-windowed setting, only then do we give an error (this time at pipeline construction) and force the user to do extra work. It's a balance between having something that works correctly in the unified model but requires extra (potentially tricky and error-prone) effort only iff one needs it. > For a transform to output in FinishBundle and be window-agnostic, the local > state of the DoFn needs to partition the windows manually, consider what > timestamp it wants to use, and output the correct thing to the correct > timestamp and window. I believe that having only the ability to > outputWindowed(value, timestamp, window) makes it quite obvious that this > is necessary. It is not boilerplate to do so, but core functionality. Yes, and we should be providing a BundlingDoFn to do this for users. > On Fri, May 5, 2017 at 11:41 AM, Robert Bradshaw < > rober...@google.com.invalid> wrote: > >> The JIRA issue https://issues.apache.org/jira/browse/BEAM-1283 >> suggests requiring an explicit Window when emitting from finshBundle. >> I'm starting a thread because JIRA/GitHub probably isn't the best (or >> most efficient) place to have this discussion. >> >> The original Spec requires the ambient WindowFn to be a >> non-element-inspecting, non-timestamp-inspecting Fn (currently, only >> the GlobalWindowsFn) and at the time this was chosen (after much >> deliberation) over requiring a WindowedValue or other options because >> batching in batch mode was a very commonly used pattern (and it should >> be noted that patterns of using state and/or a window expiry callback >> doesn't address this case well due to the lack of key with which to >> store state and distant (if ever) expiration of the window). >> >> The downside, of course, is that trying to use such a DoFn in a >> windowed context will not be caught until runtime. The proposal to >> emit WindowedValues has exactly the same downside, but the runtime >> error obtained when explicitly emitting a GlobalWindow when the >> ambient WindowFn is different will likely be much less clear (an >> encoding exception in the best case, silent data corruption in the >> worse). This also requires more boilerplate on the part of the author, >> and doesn't solve the enumerated issues of limiting which WindowFns >> can be used, choosing a timestamp, or bogus proto windows. >> >> Ideally we could catch such an error at pipeline construction time (or >> earlier) but there have been no proposals along this line yet. >> However, this is a stable-release-blocker, so we should come up with a >> (temporary at least) course of action soon. At the very least it seems >> we should accept emitting a Timestamped value as well, to which most >> WindowFns can be applied. >> >> - Robert >>