cc (dev) I tried to run the example with FlinkRunner in batch mode and received again bad data spread among the workers.
When I tried to remove number of shards for batch mode in above example, pipeline crashed before launch Caused by: java.lang.IllegalStateException: Inputs to Flatten had incompatible triggers: AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem entCountAtLeast(10000)), Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 hour)))), AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo rever(AfterPane.elementCountAtLeast(1)), Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()))) On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > Hi Max, > > I forgot to mention that example is run in streaming mode, therefore I can > not do writes without specifying shards. FileIO explicitly asks for them. > > I am not sure where the problem is. FlinkRunner is only one I used. > > On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <m...@apache.org> > wrote: > >> Hi Jozef, >> >> This does not look like a FlinkRunner related problem, but is caused by >> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle >> which apparently does not lead to good data spread in your case. >> >> Do you see the same behavior without `withNumShards(5)`? >> >> Thanks, >> Max >> >> On 22.10.18 11:57, Jozef Vilcek wrote: >> > Hello, >> > >> > I am having some trouble to get a balanced write via FileIO. Workers at >> > the shuffle side where data per window fire are written to the >> > filesystem receive unbalanced number of events. >> > >> > Here is a naive code example: >> > >> > val read = KafkaIO.read() >> > .withTopic("topic") >> > .withBootstrapServers("kafka1:9092") >> > .withKeyDeserializer(classOf[ByteArrayDeserializer]) >> > .withValueDeserializer(classOf[ByteArrayDeserializer]) >> > .withProcessingTime() >> > >> > pipeline >> > .apply(read) >> > .apply(MapElements.via(new >> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() { >> > override def apply(input: KafkaRecord[Array[Byte], >> > Array[Byte]]): String = { >> > new String(input.getKV.getValue, "UTF-8") >> > } >> > })) >> > >> > >> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1))) >> > .triggering(AfterWatermark.pastEndOfWindow() >> > .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) >> > >> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( >> > >> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)), >> > >> > >> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) >> > .discardingFiredPanes() >> > .withAllowedLateness(Duration.standardDays(7))) >> > >> > .apply(FileIO.write() >> > .via(TextIO.sink()) >> > .withNaming(new SafeFileNaming(outputPath, ".txt")) >> > .withTempDirectory(tempLocation) >> > .withNumShards(5)) >> > >> > >> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to >> > number of shards), I would expect that each worker will participate on >> > persisting shards and equally, since code uses fixed number of shards >> > (and random shard assign?). But reality is different (see 2 >> attachements >> > - statistiscs from flink task reading from kafka and task writing to >> files) >> > >> > What am I missing? How to achieve balanced writes? >> > >> > Thanks, >> > Jozef >> > >> > >> >