If I do not specify shards for unbounded collection, I get Caused by: java.lang.IllegalArgumentException: When applying WriteFiles to an unbounded PCollection, must specify number of output shards explicitly at org.apache.beam.repackaged.beam_sdks_java_core.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191) at org.apache.beam.sdk.io.WriteFiles.expand(WriteFiles.java:289)
Around same lines in WriteFiles is also a check for windowed writes. I believe FileIO enables it explicitly when windowing is present. In filesystem written files are per window and shard. On Thu, Oct 25, 2018 at 12:01 PM Maximilian Michels <m...@apache.org> wrote: > I agree it would be nice to keep the current distribution of elements > instead of doing a shuffle based on an artificial shard key. > > Have you tried `withWindowedWrites()`? Also, why do you say you need to > specify the number of shards in streaming mode? > > -Max > > On 25.10.18 10:12, Jozef Vilcek wrote: > > Hm, yes, this makes sense now, but what can be done for my case? I do > > not want to end up with too many files on disk. > > > > I think what I am looking for is to instruct IO that do not do again > > random shard and reshuffle but just assume number of shards equal to > > number of workers and shard ID is a worker ID. > > Is this doable in beam model? > > > > On Wed, Oct 24, 2018 at 4:07 PM Maximilian Michels <m...@apache.org > > <mailto:m...@apache.org>> wrote: > > > > The FlinkRunner uses a hash function (MurmurHash) on each key which > > places keys somewhere in the hash space. The hash space (2^32) is > split > > among the partitions (5 in your case). Given enough keys, the chance > > increases they are equally spread. > > > > This should be similar to what the other Runners do. > > > > On 24.10.18 10:58, Jozef Vilcek wrote: > > > > > > So if I run 5 workers with 50 shards, I end up with: > > > > > > DurationBytes receivedRecords received > > > 2m 39s 900 MB 465,525 > > > 2m 39s 1.76 GB 930,720 > > > 2m 39s 789 MB 407,315 > > > 2m 39s 1.32 GB 698,262 > > > 2m 39s 788 MB 407,310 > > > > > > Still not good but better than with 5 shards where some workers > > did not > > > participate at all. > > > So, problem is in some layer which distributes keys / shards > > among workers? > > > > > > On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com > > <mailto:re...@google.com> > > > <mailto:re...@google.com <mailto:re...@google.com>>> wrote: > > > > > > withNumShards(5) generates 5 random shards. It turns out that > > > statistically when you generate 5 random shards and you have 5 > > > works, the probability is reasonably high that some workers > > will get > > > more than one shard (and as a result not all workers will > > > participate). Are you able to set the number of shards larger > > than 5? > > > > > > On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek > > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com> > > > <mailto:jozo.vil...@gmail.com > > <mailto:jozo.vil...@gmail.com>>> wrote: > > > > > > cc (dev) > > > > > > I tried to run the example with FlinkRunner in batch mode > and > > > received again bad data spread among the workers. > > > > > > When I tried to remove number of shards for batch mode in > > above > > > example, pipeline crashed before launch > > > > > > Caused by: java.lang.IllegalStateException: Inputs to > Flatten > > > had incompatible triggers: > > > > > > > AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem > > > entCountAtLeast(10000)), > > > > > > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1 > > > hour)))), > > > > > > > AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo > > > rever(AfterPane.elementCountAtLeast(1)), > > > > > > > Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane()))) > > > > > > > > > > > > > > > > > > On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek > > > <jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com> > > <mailto:jozo.vil...@gmail.com <mailto:jozo.vil...@gmail.com>>> > wrote: > > > > > > Hi Max, > > > > > > I forgot to mention that example is run in streaming > > mode, > > > therefore I can not do writes without specifying > shards. > > > FileIO explicitly asks for them. > > > > > > I am not sure where the problem is. FlinkRunner is > > only one > > > I used. > > > > > > On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels > > > <m...@apache.org <mailto:m...@apache.org> > > <mailto:m...@apache.org <mailto:m...@apache.org>>> wrote: > > > > > > Hi Jozef, > > > > > > This does not look like a FlinkRunner related > > problem, > > > but is caused by > > > the `WriteFiles` sharding logic. It assigns keys > and > > > does a Reshuffle > > > which apparently does not lead to good data > spread in > > > your case. > > > > > > Do you see the same behavior without > > `withNumShards(5)`? > > > > > > Thanks, > > > Max > > > > > > On 22.10.18 11:57, Jozef Vilcek wrote: > > > > Hello, > > > > > > > > I am having some trouble to get a balanced > > write via > > > FileIO. Workers at > > > > the shuffle side where data per window fire are > > > written to the > > > > filesystem receive unbalanced number of events. > > > > > > > > Here is a naive code example: > > > > > > > > val read = KafkaIO.read() > > > > .withTopic("topic") > > > > .withBootstrapServers("kafka1:9092") > > > > > > > > .withKeyDeserializer(classOf[ByteArrayDeserializer]) > > > > > > > > > .withValueDeserializer(classOf[ByteArrayDeserializer]) > > > > .withProcessingTime() > > > > > > > > pipeline > > > > .apply(read) > > > > .apply(MapElements.via(new > > > > SimpleFunction[KafkaRecord[Array[Byte], > > Array[Byte]], > > > String]() { > > > > override def apply(input: > > > KafkaRecord[Array[Byte], > > > > Array[Byte]]): String = { > > > > new String(input.getKV.getValue, > > "UTF-8") > > > > } > > > > })) > > > > > > > > > > > > > > > > > > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1))) > > > > > > .triggering(AfterWatermark.pastEndOfWindow() > > > > > > > > > .withEarlyFirings(AfterPane.elementCountAtLeast(40000)) > > > > > > > > > .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger]( > > > > > > > > > Repeatedly.forever(AfterPane.elementCountAtLeast(10000)), > > > > > > > > > > > > > > > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1))))))) > > > > .discardingFiredPanes() > > > > > > > .withAllowedLateness(Duration.standardDays(7))) > > > > > > > > .apply(FileIO.write() > > > > .via(TextIO.sink()) > > > > .withNaming(new > > > SafeFileNaming(outputPath, ".txt")) > > > > .withTempDirectory(tempLocation) > > > > .withNumShards(5)) > > > > > > > > > > > > If I run this on Beam 2.6.0 with Flink 1.5.0 > on 5 > > > workers (equal to > > > > number of shards), I would expect that each > worker > > > will participate on > > > > persisting shards and equally, since code uses > > fixed > > > number of shards > > > > (and random shard assign?). But reality is > > different > > > (see 2 attachements > > > > - statistiscs from flink task reading from > > kafka and > > > task writing to files) > > > > > > > > What am I missing? How to achieve balanced > writes? > > > > > > > > Thanks, > > > > Jozef > > > > > > > > > > > > > >