cc (dev)

I tried to run the example with FlinkRunner in batch mode and received
again bad data spread among the workers.

When I tried to remove number of shards for batch mode in above example,
pipeline crashed before launch

Caused by: java.lang.IllegalStateException: Inputs to Flatten had
incompatible triggers:
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
entCountAtLeast(10000)),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
hour)))),
AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
rever(AfterPane.elementCountAtLeast(1)),
Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))





On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Hi Max,
>
> I forgot to mention that example is run in streaming mode, therefore I can
> not do writes without specifying shards. FileIO explicitly asks for them.
>
> I am not sure where the problem is. FlinkRunner is only one I used.
>
> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <m...@apache.org>
> wrote:
>
>> Hi Jozef,
>>
>> This does not look like a FlinkRunner related problem, but is caused by
>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>> which apparently does not lead to good data spread in your case.
>>
>> Do you see the same behavior without `withNumShards(5)`?
>>
>> Thanks,
>> Max
>>
>> On 22.10.18 11:57, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > I am having some trouble to get a balanced write via FileIO. Workers at
>> > the shuffle side where data per window fire are written to the
>> > filesystem receive unbalanced number of events.
>> >
>> > Here is a naive code example:
>> >
>> >      val read = KafkaIO.read()
>> >          .withTopic("topic")
>> >          .withBootstrapServers("kafka1:9092")
>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>> >          .withProcessingTime()
>> >
>> >      pipeline
>> >          .apply(read)
>> >          .apply(MapElements.via(new
>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>> >            override def apply(input: KafkaRecord[Array[Byte],
>> > Array[Byte]]): String = {
>> >              new String(input.getKV.getValue, "UTF-8")
>> >            }
>> >          }))
>> >
>> >
>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>> >              .triggering(AfterWatermark.pastEndOfWindow()
>> >                  .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>> >
>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>> >
>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>> >
>> >
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>> >              .discardingFiredPanes()
>> >              .withAllowedLateness(Duration.standardDays(7)))
>> >
>> >          .apply(FileIO.write()
>> >              .via(TextIO.sink())
>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>> >              .withTempDirectory(tempLocation)
>> >              .withNumShards(5))
>> >
>> >
>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>> > number of shards), I would expect that each worker will participate on
>> > persisting shards and equally, since code uses fixed number of shards
>> > (and random shard assign?). But reality is different (see 2
>> attachements
>> > - statistiscs from flink task reading from kafka and task writing to
>> files)
>> >
>> > What am I missing? How to achieve balanced writes?
>> >
>> > Thanks,
>> > Jozef
>> >
>> >
>>
>

Reply via email to