The FlinkRunner uses a hash function (MurmurHash) on each key which
places keys somewhere in the hash space. The hash space (2^32) is split
among the partitions (5 in your case). Given enough keys, the chance
increases they are equally spread.
This should be similar to what the other Runners do.
On 24.10.18 10:58, Jozef Vilcek wrote:
So if I run 5 workers with 50 shards, I end up with:
DurationBytes receivedRecords received
2m 39s 900 MB 465,525
2m 39s 1.76 GB 930,720
2m 39s 789 MB 407,315
2m 39s 1.32 GB 698,262
2m 39s 788 MB 407,310
Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?
On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>> wrote:
withNumShards(5) generates 5 random shards. It turns out that
statistically when you generate 5 random shards and you have 5
works, the probability is reasonably high that some workers will get
more than one shard (and as a result not all workers will
participate). Are you able to set the number of shards larger than 5?
On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>> wrote:
cc (dev)
I tried to run the example with FlinkRunner in batch mode and
received again bad data spread among the workers.
When I tried to remove number of shards for batch mode in above
example, pipeline crashed before launch
Caused by: java.lang.IllegalStateException: Inputs to Flatten
had incompatible triggers:
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(10000)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour)))),
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
<jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>> wrote:
Hi Max,
I forgot to mention that example is run in streaming mode,
therefore I can not do writes without specifying shards.
FileIO explicitly asks for them.
I am not sure where the problem is. FlinkRunner is only one
I used.
On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
<m...@apache.org <mailto:m...@apache.org>> wrote:
Hi Jozef,
This does not look like a FlinkRunner related problem,
but is caused by
the `WriteFiles` sharding logic. It assigns keys and
does a Reshuffle
which apparently does not lead to good data spread in
your case.
Do you see the same behavior without `withNumShards(5)`?
Thanks,
Max
On 22.10.18 11:57, Jozef Vilcek wrote:
> Hello,
>
> I am having some trouble to get a balanced write via
FileIO. Workers at
> the shuffle side where data per window fire are
written to the
> filesystem receive unbalanced number of events.
>
> Here is a naive code example:
>
> val read = KafkaIO.read()
> .withTopic("topic")
> .withBootstrapServers("kafka1:9092")
>
.withKeyDeserializer(classOf[ByteArrayDeserializer])
>
.withValueDeserializer(classOf[ByteArrayDeserializer])
> .withProcessingTime()
>
> pipeline
> .apply(read)
> .apply(MapElements.via(new
> SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]],
String]() {
> override def apply(input:
KafkaRecord[Array[Byte],
> Array[Byte]]): String = {
> new String(input.getKV.getValue, "UTF-8")
> }
> }))
>
>
>
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> .triggering(AfterWatermark.pastEndOfWindow()
>
.withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>
.withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>
Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> .discardingFiredPanes()
>
.withAllowedLateness(Duration.standardDays(7)))
>
> .apply(FileIO.write()
> .via(TextIO.sink())
> .withNaming(new
SafeFileNaming(outputPath, ".txt"))
> .withTempDirectory(tempLocation)
> .withNumShards(5))
>
>
> If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
workers (equal to
> number of shards), I would expect that each worker
will participate on
> persisting shards and equally, since code uses fixed
number of shards
> (and random shard assign?). But reality is different
(see 2 attachements
> - statistiscs from flink task reading from kafka and
task writing to files)
>
> What am I missing? How to achieve balanced writes?
>
> Thanks,
> Jozef
>
>