Hm, yes, this makes sense now, but what can be done for my case? I do not
want to end up with too many files on disk.

I think what I am looking for is to instruct IO that do not do again random
shard and reshuffle but just assume number of shards equal to number of
workers and shard ID is a worker ID.
Is this doable in beam model?

On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <m...@apache.org> wrote:

> The FlinkRunner uses a hash function (MurmurHash) on each key which
> places keys somewhere in the hash space. The hash space (2^32) is split
> among the partitions (5 in your case). Given enough keys, the chance
> increases they are equally spread.
>
> This should be similar to what the other Runners do.
>
> On 24.10.18 10:58, Jozef Vilcek wrote:
> >
> > So if I run 5 workers with 50 shards, I end up with:
> >
> > DurationBytes receivedRecords received
> >   2m 39s        900 MB            465,525
> >   2m 39s       1.76 GB            930,720
> >   2m 39s        789 MB            407,315
> >   2m 39s       1.32 GB            698,262
> >   2m 39s        788 MB            407,310
> >
> > Still not good but better than with 5 shards where some workers did not
> > participate at all.
> > So, problem is in some layer which distributes keys / shards among
> workers?
> >
> > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com
> > <mailto:re...@google.com>> wrote:
> >
> >     withNumShards(5) generates 5 random shards. It turns out that
> >     statistically when you generate 5 random shards and you have 5
> >     works, the probability is reasonably high that some workers will get
> >     more than one shard (and as a result not all workers will
> >     participate). Are you able to set the number of shards larger than 5?
> >
> >     On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vil...@gmail.com
> >     <mailto:jozo.vil...@gmail.com>> wrote:
> >
> >         cc (dev)
> >
> >         I tried to run the example with FlinkRunner in batch mode and
> >         received again bad data spread among the workers.
> >
> >         When I tried to remove number of shards for batch mode in above
> >         example, pipeline crashed before launch
> >
> >         Caused by: java.lang.IllegalStateException: Inputs to Flatten
> >         had incompatible triggers:
> >
>  
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> >         entCountAtLeast(10000)),
> >
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> >         hour)))),
> >
>  
> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> >         rever(AfterPane.elementCountAtLeast(1)),
> >
>  
> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
> >
> >
> >
> >
> >
> >         On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> >         <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
> >
> >             Hi Max,
> >
> >             I forgot to mention that example is run in streaming mode,
> >             therefore I can not do writes without specifying shards.
> >             FileIO explicitly asks for them.
> >
> >             I am not sure where the problem is. FlinkRunner is only one
> >             I used.
> >
> >             On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> >             <m...@apache.org <mailto:m...@apache.org>> wrote:
> >
> >                 Hi Jozef,
> >
> >                 This does not look like a FlinkRunner related problem,
> >                 but is caused by
> >                 the `WriteFiles` sharding logic. It assigns keys and
> >                 does a Reshuffle
> >                 which apparently does not lead to good data spread in
> >                 your case.
> >
> >                 Do you see the same behavior without `withNumShards(5)`?
> >
> >                 Thanks,
> >                 Max
> >
> >                 On 22.10.18 11:57, Jozef Vilcek wrote:
> >                  > Hello,
> >                  >
> >                  > I am having some trouble to get a balanced write via
> >                 FileIO. Workers at
> >                  > the shuffle side where data per window fire are
> >                 written to the
> >                  > filesystem receive unbalanced number of events.
> >                  >
> >                  > Here is a naive code example:
> >                  >
> >                  >      val read = KafkaIO.read()
> >                  >          .withTopic("topic")
> >                  >          .withBootstrapServers("kafka1:9092")
> >                  >
> >                 .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >                  >
> >                 .withValueDeserializer(classOf[ByteArrayDeserializer])
> >                  >          .withProcessingTime()
> >                  >
> >                  >      pipeline
> >                  >          .apply(read)
> >                  >          .apply(MapElements.via(new
> >                  > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
> >                 String]() {
> >                  >            override def apply(input:
> >                 KafkaRecord[Array[Byte],
> >                  > Array[Byte]]): String = {
> >                  >              new String(input.getKV.getValue, "UTF-8")
> >                  >            }
> >                  >          }))
> >                  >
> >                  >
> >                  >
> >
>  .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >                  >
> .triggering(AfterWatermark.pastEndOfWindow()
> >                  >
> >                 .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >                  >
> >
>  .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >                  >
> >                 Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >                  >
> >                  >
> >
>  
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> >                  >              .discardingFiredPanes()
> >                  >
> >                 .withAllowedLateness(Duration.standardDays(7)))
> >                  >
> >                  >          .apply(FileIO.write()
> >                  >              .via(TextIO.sink())
> >                  >              .withNaming(new
> >                 SafeFileNaming(outputPath, ".txt"))
> >                  >              .withTempDirectory(tempLocation)
> >                  >              .withNumShards(5))
> >                  >
> >                  >
> >                  > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
> >                 workers (equal to
> >                  > number of shards), I would expect that each worker
> >                 will participate on
> >                  > persisting shards and equally, since code uses fixed
> >                 number of shards
> >                  > (and random shard assign?). But reality is different
> >                 (see 2 attachements
> >                  > - statistiscs from flink task reading from kafka and
> >                 task writing to files)
> >                  >
> >                  > What am I missing? How to achieve balanced writes?
> >                  >
> >                  > Thanks,
> >                  > Jozef
> >                  >
> >                  >
> >
>

Reply via email to