Hm, yes, this makes sense now, but what can be done for my case? I do
not want to end up with too many files on disk.
I think what I am looking for is to instruct IO that do not do again
random shard and reshuffle but just assume number of shards equal to
number of workers and shard ID is a worker ID.
Is this doable in beam model?
On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <m...@apache.org
<mailto:m...@apache.org>> wrote:
The FlinkRunner uses a hash function (MurmurHash) on each key which
places keys somewhere in the hash space. The hash space (2^32) is split
among the partitions (5 in your case). Given enough keys, the chance
increases they are equally spread.
This should be similar to what the other Runners do.
On 24.10.18 10:58, Jozef Vilcek wrote:
>
> So if I run 5 workers with 50 shards, I end up with:
>
> DurationBytes receivedRecords received
> 2m 39s 900 MB 465,525
> 2m 39s 1.76 GB 930,720
> 2m 39s 789 MB 407,315
> 2m 39s 1.32 GB 698,262
> 2m 39s 788 MB 407,310
>
> Still not good but better than with 5 shards where some workers
did not
> participate at all.
> So, problem is in some layer which distributes keys / shards
among workers?
>
> On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com
<mailto:re...@google.com>
> <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
>
> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5
> works, the probability is reasonably high that some workers
will get
> more than one shard (and as a result not all workers will
> participate). Are you able to set the number of shards larger
than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek
<jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
> <mailto:jozo.vil...@gmail.com
<mailto:jozo.vil...@gmail.com>>> wrote:
>
> cc (dev)
>
> I tried to run the example with FlinkRunner in batch mode and
> received again bad data spread among the workers.
>
> When I tried to remove number of shards for batch mode in
above
> example, pipeline crashed before launch
>
> Caused by: java.lang.IllegalStateException: Inputs to Flatten
> had incompatible triggers:
>
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
> entCountAtLeast(10000)),
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
> hour)))),
>
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
> rever(AfterPane.elementCountAtLeast(1)),
>
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>
>
>
>
>
> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek
> <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>
<mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>> wrote:
>
> Hi Max,
>
> I forgot to mention that example is run in streaming
mode,
> therefore I can not do writes without specifying shards.
> FileIO explicitly asks for them.
>
> I am not sure where the problem is. FlinkRunner is
only one
> I used.
>
> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels
> <m...@apache.org <mailto:m...@apache.org>
<mailto:m...@apache.org <mailto:m...@apache.org>>> wrote:
>
> Hi Jozef,
>
> This does not look like a FlinkRunner related
problem,
> but is caused by
> the `WriteFiles` sharding logic. It assigns keys and
> does a Reshuffle
> which apparently does not lead to good data spread in
> your case.
>
> Do you see the same behavior without
`withNumShards(5)`?
>
> Thanks,
> Max
>
> On 22.10.18 11:57, Jozef Vilcek wrote:
> > Hello,
> >
> > I am having some trouble to get a balanced
write via
> FileIO. Workers at
> > the shuffle side where data per window fire are
> written to the
> > filesystem receive unbalanced number of events.
> >
> > Here is a naive code example:
> >
> > val read = KafkaIO.read()
> > .withTopic("topic")
> > .withBootstrapServers("kafka1:9092")
> >
> .withKeyDeserializer(classOf[ByteArrayDeserializer])
> >
>
.withValueDeserializer(classOf[ByteArrayDeserializer])
> > .withProcessingTime()
> >
> > pipeline
> > .apply(read)
> > .apply(MapElements.via(new
> > SimpleFunction[KafkaRecord[Array[Byte],
Array[Byte]],
> String]() {
> > override def apply(input:
> KafkaRecord[Array[Byte],
> > Array[Byte]]): String = {
> > new String(input.getKV.getValue,
"UTF-8")
> > }
> > }))
> >
> >
> >
>
.apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
> >
.triggering(AfterWatermark.pastEndOfWindow()
> >
>
.withEarlyFirings(AfterPane.elementCountAtLeast(40000))
> >
>
.withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
> >
>
Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
> >
> >
>
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
> > .discardingFiredPanes()
> >
> .withAllowedLateness(Duration.standardDays(7)))
> >
> > .apply(FileIO.write()
> > .via(TextIO.sink())
> > .withNaming(new
> SafeFileNaming(outputPath, ".txt"))
> > .withTempDirectory(tempLocation)
> > .withNumShards(5))
> >
> >
> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5
> workers (equal to
> > number of shards), I would expect that each worker
> will participate on
> > persisting shards and equally, since code uses
fixed
> number of shards
> > (and random shard assign?). But reality is
different
> (see 2 attachements
> > - statistiscs from flink task reading from
kafka and
> task writing to files)
> >
> > What am I missing? How to achieve balanced writes?
> >
> > Thanks,
> > Jozef
> >
> >
>