Hi Dongwon,

Yes, you are right that I assume that broadcasting occurs once. This
is what I meant by "If you know the data in advance". Sorry for not
being clear. If you need to periodically broadcast new versions of the
data, then I cannot find a better solution than the one you propose
with the static var.

Cheers,
Kostas

On Wed, Sep 23, 2020 at 11:49 AM Dongwon Kim <eastcirc...@gmail.com> wrote:
>
> Hi Kostas,
>
> Thanks for the input!
>
> BTW, I guess you assume that the broadcasting occurs just once for
> bootstrapping, huh?
> My job needs not only bootstrapping but also periodically fetching a
> new version of data from some external storage.
>
> Thanks,
>
> Dongwon
>
> > 2020. 9. 23. 오전 4:59, Kostas Kloudas <kklou...@gmail.com> 작성:
> >
> > Hi Dongwon,
>
>
>
>
>
> >
> > If you know the data in advance, you can always use the Yarn options
> > in [1] (e.g. the "yarn.ship-directories") to ship the directories with
> > the data you want only once to each Yarn container (i.e. TM) and then
> > write a udf which reads them in the open() method. This will allow the
> > data to be shipped only once per TM but then each of the tasks will
> > have its own copy in memory of course. By default the visibility of
> > the files that you ship is set to APPLICATION [2], if I am not
> > mistaken so if more than one TMs go to the same node, then you will
> > have even less copies shipped.
> >
> > Does this help with your usecase?
> >
> > Cheers,
> > Kostas
> >
> > [1] 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html
> > [2] 
> > https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/yarn/api/records/LocalResourceVisibility.html
> >
> >> On Sun, Sep 20, 2020 at 6:05 PM Dongwon Kim <eastcirc...@gmail.com> wrote:
> >> Hi,
> >> I'm using Flink broadcast state similar to what Fabian explained in [1]. 
> >> One difference might be the size of the broadcasted data; the size is 
> >> around 150MB.
> >> I've launched 32 TMs by setting
> >> - taskmanager.numberOfTaskSlots : 6
> >> - parallelism of the non-broadcast side : 192
> >> Here's some questions:
> >> 1) AFAIK, the broadcasted data (150MB) is sent to all 192 tasks. Is it 
> >> right?
> >> 2) Any recommended way to broadcast data only to 32 TMs so that 6 tasks in 
> >> each TM can read the broadcasted data? I'm considering implementing a 
> >> static class for the non-broadcast side to directly load data only once on 
> >> each TaskManager instead of the broadcast state (FYI, I'm using per-job 
> >> clusters on YARN, so each TM is only for a single job). However, I'd like 
> >> to use Flink native facilities if possible.
> >> The type of broadcasted data is Map<Long, Int> with around 600K entries, 
> >> so every time the data is broadcasted a lot of GC is inevitable on each TM 
> >> due to the (de)serialization cost.
> >> Any advice would be much appreciated.
> >> Best,
> >> Dongwon
> >> [1] https://flink.apache.org/2019/06/26/broadcast-state.html

Reply via email to