Hi,

Xingcan is right. There is no hidden state synchronization happening.
You have to ensure that the broadcast state is the same at every parallel
instance. Hence, it should only be modified by the
processBroadcastElement() method that receives the same broadcasted
elements on all task instance.
The API tries to help users to not violate the contract, however it is not
bullet proof. Side-passing information in a local variable (as suggested by
you) cannot be prevented and would lead to inconsistencies.

Best, Fabian



Am Mo., 27. Aug. 2018 um 16:51 Uhr schrieb Xingcan Cui <xingc...@gmail.com>:

> Hi Radu,
>
> I cannot make a full understanding of your question but I guess the answer
> is NO.
>
> The broadcast state pattern just provides you with an automatic data
> broadcasting and a bunch of map states to cache the "low-throughput”
> patterns. Also, to keep consistency, it forbid the `processElement()` to
> modify the states. But this API does not really broadcast the states. You
> should keep the logic for `processBraodcastElement()` deterministic. Maybe
> the equation below could make the pattern clear.
>
> <identical input> + <deterministic logic> = <identical states> =
> <broadcast state>
>
> Best,
> Xingcan
>
> On Aug 27, 2018, at 10:23 PM, Radu Tudoran <radu.tudo...@huawei.com>
> wrote:
>
> Hi Fabian,
>
> Thanks for the blog post about broadcast state. I have a question with
> respect to the update capabilities of the broadcast state:
>
> Assume you do whatever processing logic in the main processElement
> function .. and at a given context marker you 1) would change a local field
> marker, to 2) signal that next time the broadcast function is triggered a
> special pattern should be created and broadcasted.
>
> My question is: is such a behavior allowed? Would the new special Pattern
> that originates in an operator be shared across the other instances of the
> KeyedProcessFunction?
>
>
> public static class PatternEvaluator
>  extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long,
> Pattern>> {
>
> public bolean test = false;
>
>   @Override
>   public void processElement(
>      Action action,
>      ReadOnlyContext ctx,
>      Collector<Tuple2<Long, Pattern>> out) throws Exception {
>
>    //…logic
>
>    if (..whatever context) {
>       Test = true;
>    }
>
>    }
>
>  @Override
>  public void processBroadcastElement(
>      Pattern pattern,
>      Context ctx,
>      Collector<Tuple2<Long, Pattern>> out) throws Exception {
>    // store the new pattern by updating the broadcast state
>
>  BroadcastState<Void, Pattern> bcState =
>      ctx.getBroadcastState(new MapStateDescriptor<>("patterns",
> Types.VOID, Types.POJO(Pattern.class)));
>    // storing in MapState with null as VOID default value
>    bcState.put(null, pattern);
>
>    If (test) {
>        bcState.put(null, new Pattern(test) );
>    }
>
>  }
> }
>
>
> Dr. Radu Tudoran
> Staff Research Engineer - Big Data Expert
> IT R&D Division
>
> <image001.png>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> German Research Center
> Munich Office
> Riesstrasse 25, 80992 München
>
> E-mail: *radu.tudo...@huawei.com <radu.tudo...@huawei.com>*
> Mobile: +49 15209084330
> Telephone: +49 891588344173
>
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> This e-mail and its attachments contain confidential information from
> HUAWEI, which is intended only for the person or entity whose address is
> listed above. Any use of the information contained herein in any way
> (including, but not limited to, total or partial disclosure, reproduction,
> or dissemination) by persons other than the intended recipient(s) is
> prohibited. If you receive this e-mail in error, please notify the sender
> by phone or email immediately and delete it!
>
> *From:* Fabian Hueske [mailto:fhue...@gmail.com <fhue...@gmail.com>]
> *Sent:* Monday, August 20, 2018 9:40 AM
> *To:* Paul Lam <paullin3...@gmail.com>
> *Cc:* Rong Rong <walter...@gmail.com>; Hequn Cheng <chenghe...@gmail.com>;
> user <user@flink.apache.org>
> *Subject:* Re: What's the advantage of using BroadcastState?
>
> Hi,
>
> I've recently published a blog post about Broadcast State [1].
>
> Cheers,
> Fabian
>
> [1]
> https://data-artisans.com/blog/a-practical-guide-to-broadcast-state-in-apache-flink
>
> 2018-08-20 3:58 GMT+02:00 Paul Lam <paullin3...@gmail.com>:
>
> Hi Rong, Hequn
>
> Your answers are very helpful! Thank you!
>
> Best Regards,
> Paul Lam
>
>
> 在 2018年8月19日,23:30,Rong Rong <walter...@gmail.com> 写道:
>
> Hi Paul,
>
> To add to Hequn's answer. Broadcast state can typically be used as "a
> low-throughput stream containing a set of rules which we want to evaluate
> against all elements coming from another stream" [1]
> So to add to the difference list is: whether it is "broadcast" across all
> keys if processing a keyed stream. This is typically when it is not
> possible to derive same key field using KeySelector in CoStream.
> Another additional difference is performance: BroadcastStream is "stored
> locally and is used to process all incoming elements on the other stream"
> thus requires to carefully manage the size of the BroadcastStream.
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
>
> On Sun, Aug 19, 2018 at 1:40 AM Hequn Cheng <chenghe...@gmail.com> wrote:
>
> Hi Paul,
>
> There are some differences:
> 1. The BroadcastStream can broadcast data for you, i.e, data will be
> broadcasted to all downstream tasks automatically.
> 2. To guarantee that the contents in the Broadcast State are the same
> across all parallel instances of our operator, read-write access is only
> given to the broadcast side
> 3. For BroadcastState, flink guarantees that upon restoring/rescaling
> there will be no duplicates and no missing data. In case of recovery with
> the same or smaller parallelism, each task reads its checkpointed state.
> Upon scaling up, each task reads its own state, and the remaining tasks
> (p_new-p_old) read checkpoints of previous tasks in a round-robin manner.
> While MapState doesn't have such abilities.
>
> Best, Hequn
>
> On Sun, Aug 19, 2018 at 11:18 AM, Paul Lam <paullin3...@gmail.com> wrote:
>
> Hi,
>
> AFAIK, the difference between a BroadcastStream and a normal DataStream is
> that the BroadcastStream is with a BroadcastState, but it seems that the
> functionality of BroadcastState can also be achieved by MapState in a
> CoMapFunction or something since the control stream is still broadcasted
> without being turned into BroadcastStream. So, I’m wondering what’s the
> advantage of using BroadcastState? Thanks a lot!
>
> Best Regards,
> Paul Lam
>
>
>

Reply via email to