[ https://issues.apache.org/jira/browse/BEAM-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jeff Klukas updated BEAM-6399: ------------------------------ Description: {{In a pipeline with unbounded input, if a user defines a custom trigger and calls FileIO.withNumShards(ValueProvider<Integer>), they may see an IllegalArgumentException at runtime due to incompatible windows.}} {{ }} {{ For example, consider this compound trigger:}} {{ }} {{Window.into(new GlobalWindows())}} {{ .triggering(Repeatedly.forever(AfterFirst.of(}} {{ AfterPane.elementCountAtLeast(10000),}} {{ AfterProcessingTime.pastFirstElementInPane()}} {{ .plusDelayOf(Duration.standardMinutes(10)))))}} {{ .discardingFiredPanes()}} Using that windowing with a numShards ValueProvider yields: {{Inputs to Flatten had incompatible triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), AfterSynchronizedProcessingTime.pastFirstElementInPane()))}} In the case of ValueProvider for numShards, WriteFiles creates both a sharded and unsharded collection; the first goes through one GroupByKey while the other goes through 2. These two collections are then flattened together and they have incompatible triggers due to the double-grouped collection using a continuation trigger. If the user instead specifies a positive non-ValueProvider numShards, then a different code path is followed that avoids this incompatibility. It looks like WriteFiles may need to be implemented differently to avoid combining collections with potentially incompatible triggers. was: In a pipeline with unbounded input, if a user defines a custom trigger and calls FileIO.withNumShards(ValueProvider<Integer>), they may see an IllegalArgumentException at runtime due to incompatible windows. For example, consider this compound trigger: {{Window.into(new GlobalWindows())}} {{ .triggering(Repeatedly.forever(AfterFirst.of(}}{{ }} {{ AfterPane.elementCountAtLeast(10000), }}{{ }} {{ AfterProcessingTime.pastFirstElementInPane()}} {{ .plusDelayOf(Duration.standardMinutes(10)))))}}{{ .discardingFiredPanes()}} Using that windowing with a numShards ValueProvider yields: {{Inputs to Flatten had incompatible triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), AfterSynchronizedProcessingTime.pastFirstElementInPane()))}} In the case of ValueProvider for numShards, WriteFiles creates both a sharded and unsharded collection; the first goes through one GroupByKey while the other goes through 2. These two collections are then flattened together and they have incompatible triggers due to the double-grouped collection using a continuation trigger. If the user instead specifies a positive non-ValueProvider numShards, then a different code path is followed that avoids this incompatibility. It looks like WriteFiles may need to be implemented differently to avoid combining collections with potentially incompatible triggers. > FileIO errors on unbounded input with nondefault trigger > -------------------------------------------------------- > > Key: BEAM-6399 > URL: https://issues.apache.org/jira/browse/BEAM-6399 > Project: Beam > Issue Type: Improvement > Components: io-java-files > Reporter: Jeff Klukas > Priority: Major > > {{In a pipeline with unbounded input, if a user defines a custom trigger and > calls FileIO.withNumShards(ValueProvider<Integer>), they may see an > IllegalArgumentException at runtime due to incompatible windows.}} > {{ }} > {{ For example, consider this compound trigger:}} > {{ }} > {{Window.into(new GlobalWindows())}} > {{ .triggering(Repeatedly.forever(AfterFirst.of(}} > {{ AfterPane.elementCountAtLeast(10000),}} > {{ AfterProcessingTime.pastFirstElementInPane()}} > {{ .plusDelayOf(Duration.standardMinutes(10)))))}} > {{ .discardingFiredPanes()}} > > Using that windowing with a numShards ValueProvider yields: > > {{Inputs to Flatten had incompatible > triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000), > AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 > minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1), > AfterSynchronizedProcessingTime.pastFirstElementInPane()))}} > > In the case of ValueProvider for numShards, WriteFiles creates both a > sharded and unsharded collection; the first goes through one GroupByKey while > the other goes through 2. These two collections are then flattened together > and they have incompatible triggers due to the double-grouped collection > using a continuation trigger. > > If the user instead specifies a positive non-ValueProvider numShards, then a > different code path is followed that avoids this incompatibility. > > It looks like WriteFiles may need to be implemented differently to avoid > combining collections with potentially incompatible triggers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)