So if I run 5 workers with 50 shards, I end up with:

Duration Bytes received Records received
 2m 39s         900 MB             465,525
 2m 39s        1.76 GB             930,720
 2m 39s         789 MB             407,315
 2m 39s        1.32 GB             698,262
 2m 39s         788 MB             407,310

Still not good but better than with 5 shards where some workers did not
participate at all.
So, problem is in some layer which distributes keys / shards among workers?

On Wed, Oct 24, 2018 at 9:37 AM Reuven Lax <re...@google.com> wrote:

> withNumShards(5) generates 5 random shards. It turns out that
> statistically when you generate 5 random shards and you have 5 works, the
> probability is reasonably high that some workers will get more than one
> shard (and as a result not all workers will participate). Are you able to
> set the number of shards larger than 5?
>
> On Wed, Oct 24, 2018 at 12:28 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> cc (dev)
>>
>> I tried to run the example with FlinkRunner in batch mode and received
>> again bad data spread among the workers.
>>
>> When I tried to remove number of shards for batch mode in above example,
>> pipeline crashed before launch
>>
>> Caused by: java.lang.IllegalStateException: Inputs to Flatten had
>> incompatible triggers:
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(40000)).withLateFirings(AfterFirst.of(Repeatedly.forever(AfterPane.elem
>> entCountAtLeast(10000)),
>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(1
>> hour)))),
>> AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)).withLateFirings(AfterFirst.of(Repeatedly.fo
>> rever(AfterPane.elementCountAtLeast(1)),
>> Repeatedly.forever(AfterSynchronizedProcessingTime.pastFirstElementInPane())))
>>
>>
>>
>>
>>
>> On Tue, Oct 23, 2018 at 12:01 PM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>>
>>> Hi Max,
>>>
>>> I forgot to mention that example is run in streaming mode, therefore I
>>> can not do writes without specifying shards. FileIO explicitly asks for
>>> them.
>>>
>>> I am not sure where the problem is. FlinkRunner is only one I used.
>>>
>>> On Tue, Oct 23, 2018 at 11:27 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> Hi Jozef,
>>>>
>>>> This does not look like a FlinkRunner related problem, but is caused by
>>>> the `WriteFiles` sharding logic. It assigns keys and does a Reshuffle
>>>> which apparently does not lead to good data spread in your case.
>>>>
>>>> Do you see the same behavior without `withNumShards(5)`?
>>>>
>>>> Thanks,
>>>> Max
>>>>
>>>> On 22.10.18 11:57, Jozef Vilcek wrote:
>>>> > Hello,
>>>> >
>>>> > I am having some trouble to get a balanced write via FileIO. Workers
>>>> at
>>>> > the shuffle side where data per window fire are written to the
>>>> > filesystem receive unbalanced number of events.
>>>> >
>>>> > Here is a naive code example:
>>>> >
>>>> >      val read = KafkaIO.read()
>>>> >          .withTopic("topic")
>>>> >          .withBootstrapServers("kafka1:9092")
>>>> >          .withKeyDeserializer(classOf[ByteArrayDeserializer])
>>>> >          .withValueDeserializer(classOf[ByteArrayDeserializer])
>>>> >          .withProcessingTime()
>>>> >
>>>> >      pipeline
>>>> >          .apply(read)
>>>> >          .apply(MapElements.via(new
>>>> > SimpleFunction[KafkaRecord[Array[Byte], Array[Byte]], String]() {
>>>> >            override def apply(input: KafkaRecord[Array[Byte],
>>>> > Array[Byte]]): String = {
>>>> >              new String(input.getKV.getValue, "UTF-8")
>>>> >            }
>>>> >          }))
>>>> >
>>>> >
>>>> > .apply(Window.into[String](FixedWindows.of(Duration.standardHours(1)))
>>>> >              .triggering(AfterWatermark.pastEndOfWindow()
>>>> >
>>>> .withEarlyFirings(AfterPane.elementCountAtLeast(40000))
>>>> >
>>>> .withLateFirings(AfterFirst.of(Lists.newArrayList[Trigger](
>>>> >
>>>> Repeatedly.forever(AfterPane.elementCountAtLeast(10000)),
>>>> >
>>>> >
>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardHours(1)))))))
>>>> >              .discardingFiredPanes()
>>>> >              .withAllowedLateness(Duration.standardDays(7)))
>>>> >
>>>> >          .apply(FileIO.write()
>>>> >              .via(TextIO.sink())
>>>> >              .withNaming(new SafeFileNaming(outputPath, ".txt"))
>>>> >              .withTempDirectory(tempLocation)
>>>> >              .withNumShards(5))
>>>> >
>>>> >
>>>> > If I run this on Beam 2.6.0 with Flink 1.5.0 on 5 workers (equal to
>>>> > number of shards), I would expect that each worker will participate
>>>> on
>>>> > persisting shards and equally, since code uses fixed number of shards
>>>> > (and random shard assign?). But reality is different (see 2
>>>> attachements
>>>> > - statistiscs from flink task reading from kafka and task writing to
>>>> files)
>>>> >
>>>> > What am I missing? How to achieve balanced writes?
>>>> >
>>>> > Thanks,
>>>> > Jozef
>>>> >
>>>> >
>>>>
>>>

Reply via email to