[ 
https://issues.apache.org/jira/browse/BEAM-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Klukas updated BEAM-6399:
------------------------------
    Description: 
{{In a pipeline with unbounded input, if a user defines a custom trigger and 
calls FileIO.withNumShards(ValueProvider<Integer>), they may see an 
IllegalArgumentException at runtime due to incompatible windows.}}
{{  }}
{{ For example, consider this compound trigger:}}
{{  }}
{{Window.into(new GlobalWindows())}}
{{  .triggering(Repeatedly.forever(AfterFirst.of(}}
{{    AfterPane.elementCountAtLeast(10000),}}
{{    AfterProcessingTime.pastFirstElementInPane()}}
{{              .plusDelayOf(Duration.standardMinutes(10)))))}}

{{  .discardingFiredPanes()}}

 
 Using that windowing with a numShards ValueProvider yields:
  
 {{Inputs to Flatten had incompatible 
triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
 AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 
minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
 AfterSynchronizedProcessingTime.pastFirstElementInPane()))}}
  
 In the case of ValueProvider for numShards, WriteFiles creates both a sharded 
and unsharded collection; the first goes through one GroupByKey while the other 
goes through 2. These two collections are then flattened together and they have 
incompatible triggers due to the double-grouped collection using a continuation 
trigger.
  
 If the user instead specifies a positive non-ValueProvider numShards, then a 
different code path is followed that avoids this incompatibility.
  
 It looks like WriteFiles may need to be implemented differently to avoid 
combining collections with potentially incompatible triggers.

  was:
In a pipeline with unbounded input, if a user defines a custom trigger and 
calls FileIO.withNumShards(ValueProvider<Integer>), they may see an 
IllegalArgumentException at runtime due to incompatible windows.
 
For example, consider this compound trigger:
 
{{Window.into(new GlobalWindows())}}
{{  .triggering(Repeatedly.forever(AfterFirst.of(}}{{  }}
{{    AfterPane.elementCountAtLeast(10000), }}{{    }}
{{    AfterProcessingTime.pastFirstElementInPane()}}
{{              .plusDelayOf(Duration.standardMinutes(10)))))}}{{  
.discardingFiredPanes()}}

 
Using that windowing with a numShards ValueProvider yields:
 
{{Inputs to Flatten had incompatible 
triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
 AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 
minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
 AfterSynchronizedProcessingTime.pastFirstElementInPane()))}}
 
In the case of ValueProvider for numShards, WriteFiles creates both a sharded 
and unsharded collection; the first goes through one GroupByKey while the other 
goes through 2. These two collections are then flattened together and they have 
incompatible triggers due to the double-grouped collection using a continuation 
trigger.
 
If the user instead specifies a positive non-ValueProvider numShards, then a 
different code path is followed that avoids this incompatibility.
 
It looks like WriteFiles may need to be implemented differently to avoid 
combining collections with potentially incompatible triggers.


> FileIO errors on unbounded input with nondefault trigger
> --------------------------------------------------------
>
>                 Key: BEAM-6399
>                 URL: https://issues.apache.org/jira/browse/BEAM-6399
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-files
>            Reporter: Jeff Klukas
>            Priority: Major
>
> {{In a pipeline with unbounded input, if a user defines a custom trigger and 
> calls FileIO.withNumShards(ValueProvider<Integer>), they may see an 
> IllegalArgumentException at runtime due to incompatible windows.}}
> {{  }}
> {{ For example, consider this compound trigger:}}
> {{  }}
> {{Window.into(new GlobalWindows())}}
> {{  .triggering(Repeatedly.forever(AfterFirst.of(}}
> {{    AfterPane.elementCountAtLeast(10000),}}
> {{    AfterProcessingTime.pastFirstElementInPane()}}
> {{              .plusDelayOf(Duration.standardMinutes(10)))))}}
> {{  .discardingFiredPanes()}}
>  
>  Using that windowing with a numShards ValueProvider yields:
>   
>  {{Inputs to Flatten had incompatible 
> triggers:}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(10000),
>  AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 
> minute))),}}{{Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(1),
>  AfterSynchronizedProcessingTime.pastFirstElementInPane()))}}
>   
>  In the case of ValueProvider for numShards, WriteFiles creates both a 
> sharded and unsharded collection; the first goes through one GroupByKey while 
> the other goes through 2. These two collections are then flattened together 
> and they have incompatible triggers due to the double-grouped collection 
> using a continuation trigger.
>   
>  If the user instead specifies a positive non-ValueProvider numShards, then a 
> different code path is followed that avoids this incompatibility.
>   
>  It looks like WriteFiles may need to be implemented differently to avoid 
> combining collections with potentially incompatible triggers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to