Keep in mind that I'm not sure if any of the bounded runners support triggering like that in any reasonable way. They mostly rely on the fact that triggers are documented to be non deterministic, and then ignore them.
However this is still a bug though, because a graph that works in unbounded mode should still at least work in bounded mode. On Fri, Jan 11, 2019 at 9:47 AM Jeff Klukas <[email protected]> wrote: > It is indeed well documented that numShards is required for unbounded > input. And I do believe that a helpful error is thrown in the case of > unbounded input and runner-determined sharding. > > I do believe there's still a bug here; it's just wandered quite a bit from > the original title of the thread. The title should now be "Exception when > using custom triggering and runner-determined file sharding". > > I was seeing the IllegalStateException in a unit test when I tried to > compile my pipeline with the custom triggering. That unit test exercised > *bounded* file input and numShards=0. > > In bounded mode, it would still be useful to be able to limit file sizes > via GlobalWindows with triggering on AfterPane.elementCountAtLeast. But > elementCountAtLeast will emit a continuation trigger that trips the Flatten > problem for runner-determined sharding. > > > On Fri, Jan 11, 2019 at 12:32 PM Reuven Lax <[email protected]> wrote: > >> Ah, >> >> numShards = 0 is explicitly not supported in unbounded mode today, for >> the reason mentioned above. If FileIO doesn't reject the pipeline in that >> case, we should fix that. >> >> Reuven >> >> On Fri, Jan 11, 2019 at 9:23 AM Jeff Klukas <[email protected]> wrote: >> >>> Indeed, I was wrong about the ValueProvider distinction. I updated that >>> in the JIRA. >>> >>> It's when numShards is 0 (so runner-provided sharding) vs. an explicit >>> number. Things work fine for explicit sharding. It's the runner-provided >>> sharding mode that encounters the Flatten of PCollections with conflicting >>> triggers. >>> >>> On Fri, Jan 11, 2019 at 12:18 PM Reuven Lax <[email protected]> wrote: >>> >>>> FileIO requires an explicit numShards in unbounded mode for a number of >>>> reasons - one being that a trigger has to happen on a GroupByKey, and we >>>> need something to group on. >>>> >>>> It is extremely surprising that behavior would change between using a >>>> ValueProvider or not. The exact same codepath should be triggered >>>> regardless of whether a ValueProvider is used. >>>> >>>> Reuven >>>> >>>> On Wed, Jan 9, 2019 at 11:00 PM Kenneth Knowles <[email protected]> >>>> wrote: >>>> >>>>> Definitely sounds like a bug but also I want to caution you (or anyone >>>>> reading this archived) that there are known problems with continuation >>>>> triggers. A spec on continuation triggers that we missed was that they >>>>> really must be "compatible" (this is an arbitrary concept, having only to >>>>> do with Flattening two PCollections together) with their original trigger. >>>>> Without this, we also know that you can have three PCollections with >>>>> identical triggering and you can CoGroupByKey them together but you cannot >>>>> do this three-way join as a sequence of binary joins. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Jan 9, 2019 at 10:44 AM Jeff Klukas <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks for the response, Chamikara. I filed >>>>>> https://jira.apache.org/jira/browse/BEAM-6399 and I expect I can >>>>>> work around the problem in my case by not using a ValueProvider for >>>>>> numShards. >>>>>> >>>>>> On Wed, Jan 9, 2019 at 1:22 PM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> I'm not to familiar about the exact underlying issue here but >>>>>>> writing unbounded input to files when using GlobalWindows for unsharded >>>>>>> output is a valid usecase so sounds like a bug. Feel free to create a >>>>>>> JIRA. >>>>>>> >>>>>>> - Cham >>>>>>> >>>>>>> On Wed, Jan 9, 2019 at 10:00 AM Jeff Klukas <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I've read more deeply into the WriteFiles code and I'm >>>>>>>> understanding now that the exception is due to WriteFiles' attempt to >>>>>>>> handle unsharded input. In that case, it creates a sharded and >>>>>>>> unsharded >>>>>>>> collection; the first goes through one GroupByKey while the other goes >>>>>>>> through 2. These two collections are then flattened together and they >>>>>>>> have >>>>>>>> incompatible triggers due to the double-grouped collection using a >>>>>>>> continuation trigger. >>>>>>>> >>>>>>>> I was calling FileIO.withNumShards(ValueProvider<Integer>), but if >>>>>>>> I switch to hard coding an integer rather than passing a ValueProvider, >>>>>>>> WriteFiles uses a different code path that doesn't flatten collections >>>>>>>> and >>>>>>>> no exception is thrown. >>>>>>>> >>>>>>>> So, this might really be considered a bug of WriteFiles (and thus >>>>>>>> FileIO). But I'd love to hear other interpretations. >>>>>>>> >>>>>>>> On Wed, Jan 9, 2019 at 11:25 AM Jeff Klukas <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I'm building a pipeline that streams from Pubsub and writes to >>>>>>>>> files. I'm using FileIO's dynamic destinations to place elements into >>>>>>>>> different directories according to date and I really don't care about >>>>>>>>> ordering of elements beyond the date buckets. >>>>>>>>> >>>>>>>>> So, I think GlobalWindows is appropriate in this case, even though >>>>>>>>> the input is unbounded. Is it possible to use GlobalWindows but set a >>>>>>>>> trigger based on number of elements and/or processing time so that >>>>>>>>> beam >>>>>>>>> actually writes out files periodically? >>>>>>>>> >>>>>>>>> I tried the following: >>>>>>>>> >>>>>>>>> Window.into(new GlobalWindows()) >>>>>>>>> .triggering(Repeatedly.forever(AfterFirst.of( >>>>>>>>> AfterPane.elementCountAtLeast(10000), >>>>>>>>> >>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(10))))) >>>>>>>>> .discardingFiredPanes() >>>>>>>>> >>>>>>>>> But it raises an exception about incompatible triggers: >>>>>>>>> >>>>>>>>> Inputs to Flatten had incompatible triggers: >>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), >>>>>>>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))), >>>>>>>>> Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), >>>>>>>>> AfterSynchronizedProcessingTime.pastFirstElementInPane())) >>>>>>>>> >>>>>>>>> I believe that what's happening is that FileIO with explicit >>>>>>>>> numShards (required in the case of unbounded input) is forcing a >>>>>>>>> GroupByKey, which activates continuation triggers that are >>>>>>>>> incompatible >>>>>>>>> with my stated triggers. It's internals of WriteFiles that's trying to >>>>>>>>> flatten the incompatible PCollections together. >>>>>>>>> >>>>>>>>> >>>>>>>>>
