This occurs due to limitations surrounding sinks. Namely, you're using a TextIO.Write transform to output your data, which does not support writes of an Unbounded PCollection.
I've published PR 802 to improve the messaging. On Mon, Aug 8, 2016 at 6:07 PM, Jesse Anderson <[email protected]> wrote: > I created a Windowed Sum. You can view the full code and specific line > here > <https://github.com/eljefe6a/beamexample/blob/master/DataflowTutorial/src/main/java/com/google/cloud/dataflow/examples/complete/game/solution/Exercise4.java#L103> > . > > I'm getting this exception: > Exception in thread "main" java.lang.IllegalStateException: GroupByKey > cannot be applied to non-bounded PCollection in the GlobalWindow without a > trigger. Use a Window.into or Window.triggering transform prior to > GroupByKey. > at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java: > 173) > at org.apache.beam.sdk.transforms.GroupByKey.validate(GroupByKey.java:189) > at org.apache.beam.sdk.transforms.GroupByKey.validate(GroupByKey.java:120) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:400) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:308) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > at org.apache.beam.sdk.transforms.Combine$PerKey.apply(Combine.java:1859) > at org.apache.beam.sdk.transforms.Combine$PerKey.apply(Combine.java:1755) > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:205) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:401) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:308) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > at org.apache.beam.sdk.transforms.Combine$Globally. > apply(Combine.java:1444) > at org.apache.beam.sdk.transforms.Combine$Globally. > apply(Combine.java:1339) > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:205) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:401) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:308) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > at org.apache.beam.sdk.transforms.Combine$GloballyAsSingletonView.apply( > Combine.java:1572) > at org.apache.beam.sdk.transforms.Combine$GloballyAsSingletonView.apply( > Combine.java:1549) > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:205) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:401) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:324) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) > at org.apache.beam.runners.direct.WriteWithShardingFactory$ > DynamicallyReshardedWrite.apply(WriteWithShardingFactory.java:82) > at org.apache.beam.runners.direct.WriteWithShardingFactory$ > DynamicallyReshardedWrite.apply(WriteWithShardingFactory.java:68) > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:202) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:401) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:324) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:167) > at org.apache.beam.sdk.io.TextIO$Write$Bound.apply(TextIO.java:617) > at org.apache.beam.sdk.io.TextIO$Write$Bound.apply(TextIO.java:463) > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:205) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:401) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:308) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > at com.google.cloud.dataflow.examples.complete.game.utils. > Output$Base.apply(Output.java:54) > at com.google.cloud.dataflow.examples.complete.game.utils. > Output$WriteUserScoreSums.apply(Output.java:1) > at com.google.cloud.dataflow.examples.complete.game.utils.Output$ > WriteTriggeredUserScoreSums.apply(Output.java:1) > at com.google.cloud.dataflow.examples.complete.game.utils. > Output$Base.apply(Output.java:1) > at org.apache.beam.sdk.runners.PipelineRunner.apply( > PipelineRunner.java:76) > at org.apache.beam.runners.direct.DirectRunner.apply( > DirectRunner.java:205) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:401) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:308) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:154) > at com.google.cloud.dataflow.examples.complete.game. > solution.Exercise4.main(Exercise4.java:213) > > I put some breakpoints around the creation of the Window. The exception is > happening because the trigger is still DefaultTrigger even though I'm > setting a new trigger. is there something else going on? > > Thanks, > > Jesse >
