Hi Max,

I forgot to mention that example is run in streaming mode, therefore I can
not do writes without specifying shards. FileIO explicitly asks for them.

I am not sure where the problem is. FlinkRunner is only one I used.

On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <m...@apache.org> wrote:

> Hi Jozef,
>
> This does not look like a FlinkRunner related problem, but is caused by
> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
> which apparently does not lead to good data spread in your case.
>
> Do you see the same behavior without `withNumShards(5)`?
>
> Thanks,
> Max
>
> On 22.10.18 11:57, Jozef Vilcek wrote:
> > Hello,
> >
> > I am having some trouble to get a balanced write via FileIO. Workers at
> > the shuffle side where data per window fire are written to the
> > filesystem receive unbalanced number of events.
> >
> > Here is a naive code example:
> >
> >      val read = KafkaIO.read()
> >          .withTopic("topic")
> >          .withBootstrapServers("kafka1:9092")
> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
> >          .withProcessingTime()
> >
> >      pipeline
> >          .apply(read)
> >          .apply(MapElements.via(new
> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
> >            override def apply(input: KafkaRecord[Array[Byte],
> > Array[Byte]]): String = {
> >              new String(input.getKV.getValue, "UTF-8")
> >            }
> >          }))
> >
> >
> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >              .triggering(AfterWatermark.pastEndOfWindow()
> >                  .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >
> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >
> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >
> >
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >              .discardingFiredPanes()
> >              .withAllowedLateness(Duration.standardDays(7)))
> >
> >          .apply(FileIO.write()
> >              .via(TextIO.sink())
> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
> >              .withTempDirectory(tempLocation)
> >              .withNumShards(5))
> >
> >
> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
> > number of shards), I would expect that each worker will participate on
> > persisting shards and equally, since code uses fixed number of shards
> > (and random shard assign?). But reality is different (see 2 attachements
> > - statistiscs from flink task reading from kafka and task writing to
> files)
> >
> > What am I missing? How to achieve balanced writes?
> >
> > Thanks,
> > Jozef
> >
> >
>

Reply via email to