Hi Max, I forgot to mention that example is run in streaming mode, therefore I can not do writes without specifying shards. FileIO explicitly asks for them.
I am not sure where the problem is. FlinkRunner is only one I used. On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <m...@apache.org> wrote: > Hi Jozef, > > This does not look like a FlinkRunner related problem, but is caused by > the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle > which apparently does not lead to good data spread in your case. > > Do you see the same behavior without `withNumShards(5)`? > > Thanks, > Max > > On 22.10.18 11:57, Jozef Vilcek wrote: > > Hello, > > > > I am having some trouble to get a balanced write via FileIO. Workers at > > the shuffle side where data per window fire are written to the > > filesystem receive unbalanced number of events. > > > > Here is a naive code example: > > > > val read = KafkaIO.read() > > .withTopic("topic") > > .withBootstrapServers("kafka1:9092") > > .withKeyDeserializer(classOf[ByteArrayDeserializer]) > > .withValueDeserializer(classOf[ByteArrayDeserializer]) > > .withProcessingTime() > > > > pipeline > > .apply(read) > > .apply(MapElements.via(new > > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() { > > override def apply(input: KafkaRecord[Array[Byte], > > Array[Byte]]): String = { > > new String(input.getKV.getValue, "UTF-8") > > } > > })) > > > > > > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1))) > > .triggering(AfterWatermark.pastEndOfWindow() > > .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) > > > .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( > > > Repeatedly.forever(AfterPane.elementCountAtLeast(10000)), > > > > > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) > > .discardingFiredPanes() > > .withAllowedLateness(Duration.standardDays(7))) > > > > .apply(FileIO.write() > > .via(TextIO.sink()) > > .withNaming(new SafeFileNaming(outputPath, ".txt")) > > .withTempDirectory(tempLocation) > > .withNumShards(5)) > > > > > > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to > > number of shards), I would expect that each worker will participate on > > persisting shards and equally, since code uses fixed number of shards > > (and random shard assign?). But reality is different (see 2 attachements > > - statistiscs from flink task reading from kafka and task writing to > files) > > > > What am I missing? How to achieve balanced writes? > > > > Thanks, > > Jozef > > > > >