The FlinkRunner uses a hash function (MurmurHash) on each key which places keys somewhere in the hash space. The hash space (2^32) is split among the partitions (5 in your case). Given enough keys, the chance increases they are equally spread.

This should be similar to what the other Runners do.

On 24.10.18 10:58, Jozef Vilcek wrote:

So if I run 5 workers with 50 shards, I end up with:

DurationBytes receivedRecords received
  2m 39s        900 MB            465,525
  2m 39s       1.76 GB            930,720
  2m 39s        789 MB            407,315
  2m 39s       1.32 GB            698,262
  2m 39s        788 MB            407,310

Still not good but better than with 5 shards where some workers did not participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com <mailto:re...@google.com>> wrote:

    withNumShards(5) generates 5 random shards. It turns out that
    statistically when you generate 5 random shards and you have 5
    works, the probability is reasonably high that some workers will get
    more than one shard (and as a result not all workers will
    participate). Are you able to set the number of shards larger than 5?

    On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vil...@gmail.com
    <mailto:jozo.vil...@gmail.com>> wrote:

        cc (dev)

        I tried to run the example with FlinkRunner in batch mode and
        received again bad data spread among the workers.

        When I tried to remove number of shards for batch mode in above
        example, pipeline crashed before launch

        Caused by: java.lang.IllegalStateException: Inputs to Flatten
        had incompatible triggers:
        
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
        entCountAtLeast(10000)),
        
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
        hour)))),
        
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
        rever(AfterPane.elementCountAtLeast(1)),
        
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))





        On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
        <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:

            Hi Max,

            I forgot to mention that example is run in streaming mode,
            therefore I can not do writes without specifying shards.
            FileIO explicitly asks for them.

            I am not sure where the problem is. FlinkRunner is only one
            I used.

            On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
            <m...@apache.org <mailto:m...@apache.org>> wrote:

                Hi Jozef,

                This does not look like a FlinkRunner related problem,
                but is caused by
                the `WriteFiles` sharding logic. It assigns keys and
                does a Reshuffle
                which apparently does not lead to good data spread in
                your case.

                Do you see the same behavior without `withNumShards(5)`?

                Thanks,
                Max

                On 22.10.18 11:57, Jozef Vilcek wrote:
                 > Hello,
                 >
                 > I am having some trouble to get a balanced write via
                FileIO. Workers at
                 > the shuffle side where data per window fire are
                written to the
                 > filesystem receive unbalanced number of events.
                 >
                 > Here is a naive code example:
                 >
                 >      val read = KafkaIO.read()
                 >          .withTopic("topic")
                 >          .withBootstrapServers("kafka1:9092")
> .withKeyDeserializer(classOf[ByteArrayDeserializer]) > .withValueDeserializer(classOf[ByteArrayDeserializer])
                 >          .withProcessingTime()
                 >
                 >      pipeline
                 >          .apply(read)
                 >          .apply(MapElements.via(new
                 > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
                String]() {
                 >            override def apply(input:
                KafkaRecord[Array[Byte],
                 > Array[Byte]]): String = {
                 >              new String(input.getKV.getValue, "UTF-8")
                 >            }
                 >          }))
                 >
                 >
                 >
                
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
                 >              .triggering(AfterWatermark.pastEndOfWindow()
> .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) > .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( > Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
                 >
                 >
                
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
                 >              .discardingFiredPanes()
> .withAllowedLateness(Duration.standardDays(7)))
                 >
                 >          .apply(FileIO.write()
                 >              .via(TextIO.sink())
                 >              .withNaming(new
                SafeFileNaming(outputPath, ".txt"))
                 >              .withTempDirectory(tempLocation)
                 >              .withNumShards(5))
                 >
                 >
                 > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
                workers (equal to
                 > number of shards), I would expect that each worker
                will participate on
                 > persisting shards and equally, since code uses fixed
                number of shards
                 > (and random shard assign?). But reality is different
                (see 2 attachements
                 > - statistiscs from flink task reading from kafka and
                task writing to files)
                 >
                 > What am I missing? How to achieve balanced writes?
                 >
                 > Thanks,
                 > Jozef
                 >
                 >

Reply via email to