I think I would scope the effort slightly differently. Note that I might be
missing some requirements or overlook something.

1. Enable Flink to support CEP dynamic patterns

Here I would define the commands and formats for a new CEP pattern. Then I
would extend the CEP operator to understand these commands so that we can
change the CEP patterns dynamically. This should give the building blocks
to set up a job where you can change the CEP patterns by reading from an
arbitrary side input the CEP pattern commands. Note that it will be the
responsibility of the user to ingest the data somehow. This is nothing
Flink is concerned with at this time.

2. Think about how to make the CEP dynamic pattern feature more ergonomic

Here we can pick up the discussion about OC and having a REST ingestion
endpoint in Flink vs. offering tools that live outside of Flink itself. I
could imagine that this could become a separate small project that builds
upon Flink, for example.

Cheers,
Till

On Wed, Jan 5, 2022 at 2:19 PM Becket Qin <becket....@gmail.com> wrote:

> Hi Till,
>
> Thanks for the prompt reply. Like you said, we are indeed using the dynamic
> CEP pattern use case to test the existing primitives in Flink to see if
> they can meet the requirements. I fully understand the concern of
> exposing OC as a user interface. Meanwhile I see CEP dynamic patterns as a
> good opportunity to battle test and enhance the OC as a user facing control
> plane which is currently missing. After all, there is no better person than
> ourselves to try it out first.
>
> It is not clear to me whether it is worth continuing the effort of
> supporting dynamic CEP pattern without concluding the control plane
> discussion. Let's say we have a CEP job reading from Kafka. To make this
> work with side-input, a few things need to be done.
>
>    1. In order to support dynamic patterns, users would create another
>    Kafka topic as side-input to receive dynamic patterns.
>    2. In order to insert dynamic patterns, users would use a separate web
>    server that is provided by us as a separate tool. The web server takes
> http
>    requests and sends dynamic pattern records to Kafka via a Kafka sink
> (using
>    a KafkaProducer is likely simpler here, though).
>    3. Regarding querying the running dynamic patterns, given Kafka is not
>    queryable, users would probably introduce a database and insert the
>    patterns there so they can query the running patterns. Maybe this could
> be
>    done by the CEP operator side-output, so there is less chance of
>    inconsistency between Kafka and the database.
>    4. If the Flink job is to be stopped, the dynamic pattern Kafka topic
>    needs to be deleted, the companion web server also needs to be stopped
>    (assuming it is not shared with other CEP jobs), and the database table
>    storing the dynamic pattern needs to be dropped.
>
> Please correct me if I misunderstood something, but this seems quite
> involved. Moreover, all the work here is going to be thrown away after we
> have OC as a decent user facing control plane in place. And we will likely
> have a backwards incompatible API change here. Given that, I am wondering
> if we should wait until the OC discussion concludes before moving on with
> the dynamic patterns?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Jan 5, 2022 at 5:53 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
> > Thanks for the detailed explanation Becket.
> >
> > Do you think that an additional dependency is a deal breaker for people
> to
> > use dynamic CEP patterns? At the very least people have to operate some
> > kind of storage/queue system from which the CEP job can read anyway.
> Maybe
> > it could be good enough to provide a REST endpoint (as a separate tool)
> > that can be instantiated with a Flink sink to ingest REST requests
> > into some queue. A general concern I have is that by making the JM a REST
> > ingestion point for data will push another responsibility to Flink and
> > increase the surface area further.
> >
> > For how to handle data that cannot be processed by patterns, I think
> there
> > also exist other solutions. I could imagine that users could define
> > different failover strategies. E.g. one could simply ignore the record,
> the
> > pattern could get deactivated on the affected TM or the processing fails.
> >
> > Maybe we are coupling the dynamic CEP pattern effort too much on where
> the
> > new patterns come from. Maybe we can split these efforts into supporting
> > dynamic CEP patterns on the TM reading from some source and then fork off
> > the discussion about introducing a user controlled control plane to
> Flink.
> > That way we wouldn't block this effort and could discuss more about the
> > exact properties such a user control plane would need to have. What do
> you
> > think?
> >
> > Cheers,
> > Till
> >
> > On Wed, Jan 5, 2022 at 7:18 AM Becket Qin <becket....@gmail.com> wrote:
> >
> > > Hi Till,
> > >
> > > Thanks for the comments and questions. To be clear, I am not saying
> that
> > > the side-input stream does not work for the dynamic pattern update use
> > > case. But I think OC is a better solution. The design goal for CEP
> > dynamic
> > > pattern is not only make it work, but also make it user friendly and
> > > extensible. So as a user, I expect the following:
> > >
> > > - Use CEP with dynamic pattern update without depending on external
> > > systems. e.g. updating the pattern by directly talking to the Flink job
> > > itself.
> > > - Some sort of isolation between patterns. e.g. a pattern update
> failure
> > > won't cause other running patterns to fail.
> > > - Easy to query the currently running patterns in the Flink job.
> > > - Extensible to add new features. e.g. apply some pattern to a subset
> of
> > > subtasks. Disable dynamic pattern update during a service window, etc.
> > >
> > > It looks to me that OC is a more promising way to to achieve the above.
> > >
> > > Please also see the reply to your questions inline below.
> > >
> > > > You mentioned that a TM might find out that a pattern is invalid and
> > then
> > > > it could use the 2-way communication with the JM to tell the other
> TMs
> > > that
> > > > the pattern is invalid. How exactly would a TM detect that a pattern
> is
> > > > invalid at data processing time? And assuming that this is possible
> > for a
> > > > single TM, why can't the other TMs not detect this problem? Is it
> > because
> > > > the validity of patterns might depend on data that is different
> between
> > > > TMs?
> > >
> > >
> > > Yes, the TMs are processing different records, so some may encounter
> > issue
> > > while others are running fine, for example, a field may be missing
> from a
> > > record while the pattern requires it. Even if all the TMs have detected
> > the
> > > problem, without a channel to report back the issue, the only thing TMs
> > can
> > > do is either to throw exception and stop processing.
> > >
> > > You also mentioned that deep-learning-on-flink could benefit from an OC
> > > > control plane but I couldn't find the referenced link. Would
> > > > deep-learning-on-flink like to use OCs as a kind of parameter server?
> > > Could
> > > > you elaborate on deep-learning-on-flink's usage of such a feature?
> > >
> > >
> > > deep-learning-on-flink[1] has a general ML cluster abstraction that can
> > run
> > > TF / PyTorch etc. This cluster has a ML master role to manage the
> > > liveliness of all the ML workers which are python processes running
> > > side-by-side with TMs. The ML workers can be PS or Worker of
> TensorFlow.
> > > For example, if 10 TF worker and 5 TF PS nodes are needed, a Flink UDF
> > > operator of parallelism 10 and a Flink Source operator with parallelism
> > = 5
> > > will be created to run the TF worker and TF PS respectively. All the 15
> > > nodes running either PS or Worker are managed by the ML master. Prior
> to
> > > OC, the implementation was having a Source operator with parallelism =
> 1
> > to
> > > run the master; let the ML workers register themselves to the an
> external
> > > ZK so the master can discover them. So basically users have to build
> > their
> > > own control plane. With OC, at very least, we no longer need ZK
> anymore.
> > >
> > > Concerning reprocessing command history and repeatable processing
> > results I
> > > > think there is actually quite a big difference between using a Flink
> > > source
> > > > vs. offering a REST server that can receive command that are
> > distributed
> > > > via the OCs. In the former case we can rely on the external system to
> > > > persist commands whereas in the latter approach we/users have to
> > > implement
> > > > a storage solution on their own (in the simplest case users will
> > probably
> > > > store everything in state which might grow indefinitely if stored as
> a
> > > > changelog).
> > >
> > >
> > > It is true that a Flink Source is easier to provide a change log of
> > > patterns. But there are also downsides. The most prominent one is that
> in
> > > order to update the dynamic pattern, users have to run an external
> system
> > > for the source to read. Or users have to implement a REST Source taking
> > > command directly like a web server. In that case, the service discovery
> > is
> > > again a problem and needs external dependency.
> > > On the other hand, it is not so difficult for OC to store a change log
> > > history. Assuming there are 100 pattern updates / day for a CEP job and
> > one
> > > year of history should be stored. Let's say each pattern change log
> entry
> > > is 1K bytes. The pattern update change log for the entire year is just
> 34
> > > MB, which seems quite small. And users can also query the patterns in
> > > effect via the same endpoint. I think this would provide a good
> > out-of-box
> > > experience in the vast majority of cases. No external dependencies and
> a
> > > single REST endpoint. In rare cases where there is a large amount of
> > > pattern update log. External systems can still be leveraged. One
> > potential
> > > issue might be that a command that has been received but not
> checkpointed
> > > yet may got lost if the job fails over. This can be mitigated by
> allowing
> > > OC to ask JM to trigger a checkpoint (not there yet but may worth
> > > thinking), or simply introduce a *PendingCommit* state for a pattern
> > > indicating it's not committed yet. In the worst case, we can always
> > > fallback to store patterns in external systems such as a database.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/flink-extended/dl-on-flink/tree/master/deep-learning-on-flink
> > >
> > >
> > > On Tue, Jan 4, 2022 at 4:45 PM Till Rohrmann <trohrm...@apache.org>
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > Thanks for the explanation. While I do agree that a general 2-way
> > > > communication pattern would be nice to have, I also believe that this
> > > > approach is probably at least one magnitude more complex to realize
> > than
> > > > the side-input approach. Therefore, I really would like to understand
> > why
> > > > such a mechanism is required for the CEP use case.
> > > >
> > > > You mentioned that a TM might find out that a pattern is invalid and
> > then
> > > > it could use the 2-way communication with the JM to tell the other
> TMs
> > > that
> > > > the pattern is invalid. How exactly would a TM detect that a pattern
> is
> > > > invalid at data processing time? And assuming that this is possible
> > for a
> > > > single TM, why can't the other TMs not detect this problem? Is it
> > because
> > > > the validity of patterns might depend on data that is different
> between
> > > > TMs?
> > > >
> > > > You also mentioned that deep-learning-on-flink could benefit from an
> OC
> > > > control plane but I couldn't find the referenced link. Would
> > > > deep-learning-on-flink like to use OCs as a kind of parameter server?
> > > Could
> > > > you elaborate on deep-learning-on-flink's usage of such a feature?
> > > >
> > > > Concerning reprocessing command history and repeatable processing
> > > results I
> > > > think there is actually quite a big difference between using a Flink
> > > source
> > > > vs. offering a REST server that can receive command that are
> > distributed
> > > > via the OCs. In the former case we can rely on the external system to
> > > > persist commands whereas in the latter approach we/users have to
> > > implement
> > > > a storage solution on their own (in the simplest case users will
> > probably
> > > > store everything in state which might grow indefinitely if stored as
> a
> > > > changelog).
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Tue, Jan 4, 2022 at 6:41 AM Becket Qin <becket....@gmail.com>
> > wrote:
> > > >
> > > > > Hi David,
> > > > >
> > > > > Thanks for sharing your thoughts. Some quick reply to your
> comments:
> > > > >
> > > > > We're still talking about the "web server based"
> > > > > > pattern_processor_discoverer, but what about other use cases? One
> > of
> > > my
> > > > > big
> > > > > > concerns is that user's can not really reuse any part of the
> Flink
> > > > > > ecosystem to implement the discovery logic. For example if they
> > want
> > > to
> > > > > > read patterns from Kafka topic, they need to roll their own
> > > discoverer
> > > > > > based on the vanilla Kafka client. If we're talking about
> > > > extensibility,
> > > > > > should we also make sure that the existing primitives can be
> > reused?
> > > > >
> > > > >
> > > > > KafkaSource is actually more complicated than vanilla KafkaConsumer
> > > from
> > > > > the perspective of consuming Kafka messages. The KafkaSource just
> > helps
> > > > > make it easier for Flink jobs to process these messages. In fact,
> > > > > KafkaConsumer is more powerful than KafkaSource in terms of talking
> > to
> > > > > Kafka. So we are comparing [KafkaConsumer + OperatorCoordinator]
> and
> > > > > [KafkaSource + side-input], not [KafkaConsumer + side-input].
> > > > >
> > > > > This can be done for the side-input as well by filtering invalid
> > > patterns
> > > > > > before the broadcast. You can also send the invalid patterns to
> any
> > > > side
> > > > > > output you want. I have a feeling that we're way too attached to
> > the
> > > > REST
> > > > > > server use case in this discussion. I agree that for that case,
> > this
> > > > > > solution is the most straightforward one.
> > > > >
> > > > >
> > > > > Depending on whether the invalidity is discovered in pattern
> > definition
> > > > > time or data processing time. e.g. A valid pattern with invalid
> data
> > > > which
> > > > > fails to process can only be detected in data processing time. So
> the
> > > > > pattern won't be filtered out before the broadcast.
> > > > >
> > > > > I agree that 2-way communication in the "data-flow like" API is
> > tricky,
> > > > > > because it requires cycles / iterations, which are still not
> really
> > > > > solved
> > > > > > (for a good reason, it's really tough nut to crack). This makes
> me
> > > > think
> > > > > > that the OC may be bit of a "incomplete" workaround for not
> having
> > > > fully
> > > > > > working support for iterations.
> > > > >
> > > > >
> > > > > For example I'm not really confident that the checkpointing of the
> OC
> > > > works
> > > > > > correctly right now, because it doesn't seem to require
> checkpoint
> > > > > barrier
> > > > > > alignment as the regular stream inputs. We also don't have a
> proper
> > > > > support
> > > > > > for watermarking (this is again tricky, because of the cycle).
> > > > >
> > > > >
> > > > > If we decide to go down this road, should we first address some of
> > > these
> > > > > > limitations?
> > > > >
> > > > > I agree. OC is proven to be useful and we should think about how to
> > > > enhance
> > > > > it instead of not using it.
> > > > >
> > > > > If I understand that correctly, this means only the LATEST state of
> > the
> > > > > > patterns (in other words - patterns that are currently in use).
> Is
> > > this
> > > > > > really sufficient for historical re-processing? Can someone for
> > > example
> > > > > > want re-process the data in more of a "temporal join" fashion?
> Also
> > > > AFAIK
> > > > > > historical processing in combination with "coordinator
> checkpoints"
> > > is
> > > > > not
> > > > > > really something that we currently support of the box, are there
> > any
> > > > > plans
> > > > > > on tackling this (my other concern is that this should not go
> > against
> > > > the
> > > > > > "unified batch & stream processing" efforts)?
> > > > >
> > > > > I think in this case, the state would contain the entire pattern
> > update
> > > > > history. Not the final state, but the change log. In fact, it is
> not
> > > > > possible to guarantee the correct result unless you have the entire
> > > > change
> > > > > log. Even with side-input, there still could be late arrivals in
> the
> > > > > pattern update stream, and the only way to correct it is to either
> > have
> > > > all
> > > > > the data reprocessed (with all the pattern change log loaded
> upfront)
> > > or
> > > > > having retraction support.
> > > > >
> > > > > I can imagine that if this should be a concern, we could move the
> > > > execution
> > > > > > of the OC to the task managers. This also makes me thing, that we
> > > > > shouldn't
> > > > > > make any strong assumptions that the OC will always run in the
> > > > JobManager
> > > > > > (this is especially relevant for the embedded web-server use
> case).
> > > > >
> > > > > This is a very good point. I think we should do this. The reason OC
> > is
> > > in
> > > > > JM is based on the assumption that control plane usually have
> little
> > > > > traffic. But if that is not the case, we should move the OC out of
> > JM,
> > > > > maybe to TM.
> > > > >
> > > > >
> > > > >
> > > > > I think the key topic we are discussing is what the Flink control
> > plane
> > > > > should look like, and CEP is just an example use case. There are
> > > > basically
> > > > > two options:
> > > > >
> > > > > 1. Using the side-input / broadcast stream
> > > > > 2. Using the OperatorCoordinator.
> > > > >
> > > > > Although we are having this discussion under the context of
> > supporting
> > > > CEP
> > > > > dynamic pattern update, it would be really helpful to reach
> agreement
> > > on
> > > > > the design principle in general of how Flink should handle similar
> > > > external
> > > > > control demands, a.k.a how to provide a decent control plane in
> Flink
> > > to
> > > > > the users.
> > > > >
> > > > > Here is my take on these two options, based on the requirements of
> a
> > > > > control plane.
> > > > >
> > > > > *Communication Requirements*
> > > > > Flink has two kinds of control planes - in-band and out-of-band.
> > > > > In-band control plane is like watermark and checkpoint marker. It
> > > > requires
> > > > > the entire DAG to be in a consistent state, so the control messages
> > > flow
> > > > > with the data in one direction. This is a one-way control flow.
> > > > > Examples of out-of-band control plane is OC and JM/TM
> communication.
> > > They
> > > > > do not require a global consistency or awareness across the entire
> > DAG.
> > > > >
> > > > > Before OC, neither of these two kinds of control plain was exposed
> to
> > > the
> > > > > users. OC allows the users to leverage the out-of-band control
> plane
> > so
> > > > > they can coordinate across subtasks. This is a more widely used
> > control
> > > > > flow from user defined operator perspective which requires 2-way
> > > > > communication.
> > > > >
> > > > >    - side-input stream is one-way communication. Moreover, I don't
> > > think
> > > > >    iteration is the right way to address this. Unlike the
> abstraction
> > > of
> > > > > data
> > > > >    processing which is a DAG, the control plane may require
> > > communication
> > > > >    between arbitrary two operators in different orders. Using
> > > iterations
> > > > > may
> > > > >    end up with an explosion of feedback edges that are not only
> > > > > complicated to
> > > > >    express, but also hard to debug and maintain.
> > > > >    - OC provides a decent 2-way communication mechanism.
> > > > >
> > > > >
> > > > > *Cost for low traffic use case*
> > > > > An important difference between data channel and control channel is
> > > > traffic
> > > > > pattern. Unlike data plane, the control plane is usually idle for
> the
> > > > vast
> > > > > majority of the time with occasional control messages.
> > > > >
> > > > >    - One argument to use the side-input / broadcast stream was that
> > > users
> > > > >    can reuse the existing connectors. However, most Flink sources
> are
> > > > > designed
> > > > >    to handle large throughput of traffic that maintains all the
> > > threads,
> > > > >    network connections, etc. Not to say the external resources such
> > as
> > > > > Kafka
> > > > >    topics to store the commands.
> > > > >    - OC can have a much lower constant cost.
> > > > >
> > > > >
> > > > > *Easy to use*
> > > > > Personally speaking, I think the most simple and user-friendly
> > > interface
> > > > is
> > > > > a REST API that accepts control commands.
> > > > >
> > > > >    - One proposal at this point is to have a separate web server
> > > > >    independent of the Flink job. In that case, should users
> implement
> > > > that
> > > > > web
> > > > >    server? In order to pass the command to the Flink job, does that
> > > mean
> > > > > users
> > > > >    should implement a Flink source that can read the commands from
> > the
> > > > web
> > > > >    server? Or the web server would send the command to something
> like
> > > > Kafka
> > > > >    and let the Flink side-input read from there. There seems to be
> a
> > > lot
> > > > > for
> > > > >    the users to do. And it seems an unnecessarily complicated
> system
> > to
> > > > > build
> > > > >    to send occasional control messages to a Flink job.
> > > > >    - OC can simply be reached via the JM REST API.
> > > > >       - e.g. http://JM_IP/OperatorEvents?OP_NAME=xxx&Content=xxx
> > > > >       <http://jm_ip/OperatorEvents?OP_NAME=xxx&Content=xxx> would
> > > simply
> > > > >       send the Content to the OC of OP_NAME.
> > > > >
> > > > > *Work well with checkpoints*
> > > > >
> > > > >    - side-input streams works well with checkpoint, thanks to its
> > > > >    compatibility to the DAG abstraction.
> > > > >    - The OperatorCoordinator requires more careful design to work
> > well
> > > > with
> > > > >    checkpoints. The complexity mostly comes from the 2-way
> > > communication.
> > > > > At
> > > > >    this point the OperatorCoordinator message delivery semantic may
> > not
> > > > be
> > > > > as
> > > > >    robust as side-input streams. However, I think this is mainly
> due
> > to
> > > > >    potential bugs in implementation instead of fundamental design
> > > > problems.
> > > > >    Basically the checkpoint semantic should be:
> > > > >       - The JM will first checkpoint all the OperatorCoordinator,
> > more
> > > > >       specifically for each OperatorCoordinator,
> > > > >          - 1) JM blocks the input queue to stop receiving new
> events
> > > from
> > > > >          the subtasks.
> > > > >          - 2) let all the operator coordinator finish processing
> all
> > > the
> > > > >          pending events in the OC input queue.
> > > > >          - 3) flush the OC output queue until all the events sent
> are
> > > > >          acknowledged.
> > > > >          - 3) unblock the input queue, but block the output queue
> > until
> > > > all
> > > > >          the subtasks of the OC finish checkpoints.
> > > > >       - The JM starts a normal checkpoint starting from the Source.
> > > > >          - If an operator has a coordinator, and it has sent some
> > > events
> > > > to
> > > > >          the OC but has not been acknowledged. These events need to
> > be
> > > > > put into the
> > > > >          subtasks' checkpoint.
> > > > >       - After all the subtasks of an operator finishes checkpoint,
> > the
> > > > >       output queue of the corresponding OC can be unblocked.
> > > > >    It looks that the above protocol would let OC have a clear
> > > checkpoint
> > > > >    semantic. And this complexity seems must for 2-way
> communication.
> > I
> > > > > think
> > > > >    we are able to hide the above protocol from users by having an
> > > > abstract
> > > > > OC
> > > > >    implementation.
> > > > >
> > > > >
> > > > > *Reprocess command history and repeatable processing results*
> > > > > This seems not something generally required for a control plane.
> But
> > if
> > > > the
> > > > > control plane changes the data processing behavior, there might be
> > some
> > > > use
> > > > > case. Just like the CEP pattern update. In order to do this, users
> > will
> > > > > need to provide a full command history to the operators. That
> > consists
> > > of
> > > > > two steps:
> > > > >
> > > > >    1. get the full command history.  (Flink Source V.S. Custom way
> to
> > > > >    retrieve command history.)
> > > > >    2. send them to the operators. (Side-input V.S. OC/Operator
> > > > >    communication)l
> > > > >
> > > > > I actually don't feel much difference between these two options. I
> am
> > > not
> > > > > sure if a Flink source is necessarily simpler than the user
> > implemented
> > > > > logic. For example, if all the changes are just written in a
> database
> > > > > table. From the user's perspective, I feel fine with querying the
> DB
> > by
> > > > > myself using something like JDBC (or use KafkaConsumer to read the
> > > > pattern
> > > > > history from a Kafka topic) and send all the pattern change history
> > to
> > > > the
> > > > > operators before they start to process the actual events.
> > > > >
> > > > > *Stability impact to the Flink Job*
> > > > > I agree this is a valid concern. There is only one JM and currently
> > OC
> > > > runs
> > > > > there, which may potentially cause JM crash and result in job
> > failure.
> > > > > Running in TM may have a less impact. So I think it makes sense to
> > run
> > > OC
> > > > > in TM. However, this is more of how we can implement OC to make it
> > more
> > > > > robust. It does not forfeit the other semantic and user experience
> > > > benefits
> > > > > mentioned above.
> > > > >
> > > > >
> > > > > To sum up:
> > > > > 1. Regardless of whether CEP chooses OC or side-input stream, a
> > decent
> > > > > 2-way communication control plane seems really helpful and has been
> > > > proven
> > > > > by quite a few existing use cases. To me the question is how to
> make
> > it
> > > > > more robust, not whether we should use it or not.
> > > > > 2. As for CEP, if we assume OC will be robust enough, personally I
> > feel
> > > > OC
> > > > > is a better fit than side-input mainly because of the simplicity
> and
> > > > > extensibility.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > > On Thu, Dec 30, 2021 at 8:43 PM David Morávek <d...@apache.org>
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > sorry for the late reply, vacation season ;) I'm still not 100%
> > sold
> > > on
> > > > > > choosing the OC for this use-case, but on the other hand I don't
> > have
> > > > > > strong arguments against it. Few more questions / thoughts:
> > > > > >
> > > > > > We're still talking about the "web server based"
> > > > > > pattern_processor_discoverer, but what about other use cases? One
> > of
> > > my
> > > > > big
> > > > > > concerns is that user's can not really reuse any part of the
> Flink
> > > > > > ecosystem to implement the discovery logic. For example if they
> > want
> > > to
> > > > > > read patterns from Kafka topic, they need to roll their own
> > > discoverer
> > > > > > based on the vanilla Kafka client. If we're talking about
> > > > extensibility,
> > > > > > should we also make sure that the existing primitives can be
> > reused?
> > > > > >
> > > > > > For instance, the example I gave in my previous email seems not
> > > easily
> > > > > > > achievable with side-input / broadcast streams: a single
> invalid
> > > > > pattern
> > > > > > > detected on a TM can be disabled elegantly globally without
> > > crashing
> > > > > the
> > > > > > > entire Flink job.
> > > > > >
> > > > > >
> > > > > > This can be done for the side-input as well by filtering invalid
> > > > patterns
> > > > > > before the broadcast. You can also send the invalid patterns to
> any
> > > > side
> > > > > > output you want. I have a feeling that we're way too attached to
> > the
> > > > REST
> > > > > > server use case in this discussion. I agree that for that case,
> > this
> > > > > > solution is the most straightforward one.
> > > > > >
> > > > > > OC is a 2-way communication mechanism, i.e. a subtask can also
> send
> > > > > > > OperatorEvent to the coordinator to report its owns status, so
> > that
> > > > the
> > > > > > > coordinator can act accordingly.
> > > > > > >
> > > > > >
> > > > > > I agree that 2-way communication in the "data-flow like" API is
> > > tricky,
> > > > > > because it requires cycles / iterations, which are still not
> really
> > > > > solved
> > > > > > (for a good reason, it's really tough nut to crack). This makes
> me
> > > > think
> > > > > > that the OC may be bit of a "incomplete" workaround for not
> having
> > > > fully
> > > > > > working support for iterations.
> > > > > >
> > > > > > For example I'm not really confident that the checkpointing of
> the
> > OC
> > > > > works
> > > > > > correctly right now, because it doesn't seem to require
> checkpoint
> > > > > barrier
> > > > > > alignment as the regular stream inputs. We also don't have a
> proper
> > > > > support
> > > > > > for watermarking (this is again tricky, because of the cycle).
> > > > > >
> > > > > > If we decide to go down this road, should we first address some
> of
> > > > these
> > > > > > limitations?
> > > > > >
> > > > > > OperatorCoordinator will checkpoint the full amount of
> > > PatternProcessor
> > > > > > > data. For the reprocessing of historical data, you can read the
> > > > > > > PatternProcessor snapshots saved by this checkpoint from a
> > certain
> > > > > > > historical checkpoint, and then recreate the historical data
> > > through
> > > > > > these
> > > > > > > PatternProcessor snapshots.
> > > > > > >
> > > > > >
> > > > > > If I understand that correctly, this means only the LATEST state
> of
> > > the
> > > > > > patterns (in other words - patterns that are currently in use).
> Is
> > > this
> > > > > > really sufficient for historical re-processing? Can someone for
> > > example
> > > > > > want re-process the data in more of a "temporal join" fashion?
> Also
> > > > AFAIK
> > > > > > historical processing in combination with "coordinator
> checkpoints"
> > > is
> > > > > not
> > > > > > really something that we currently support of the box, are there
> > any
> > > > > plans
> > > > > > on tackling this (my other concern is that this should not go
> > against
> > > > the
> > > > > > "unified batch & stream processing" efforts)?
> > > > > >
> > > > > > I do agree that having the user defined control logic defined in
> > the
> > > JM
> > > > > > > increases the chance of instability.
> > > > > > >
> > > > > >
> > > > > > I can imagine that if this should be a concern, we could move the
> > > > > execution
> > > > > > of the OC to the task managers. This also makes me thing, that we
> > > > > shouldn't
> > > > > > make any strong assumptions that the OC will always run in the
> > > > JobManager
> > > > > > (this is especially relevant for the embedded web-server use
> case).
> > > > > >
> > > > > > If an agreement is reached on OperatorCoodinator, I will start
> the
> > > > voting
> > > > > > > thread.
> > > > > > >
> > > > > >
> > > > > > As for the vote, I'd would be great if we can wait until the next
> > > week
> > > > as
> > > > > > many people took vacation until end of the year.
> > > > > >
> > > > > > Overall, I really like the feature, this will be a great addition
> > to
> > > > > Flink.
> > > > > >
> > > > > > Best,
> > > > > > D.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Dec 30, 2021 at 11:27 AM Martijn Visser <
> > > mart...@ververica.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I can understand the need for a control plane mechanism. I'm
> not
> > > the
> > > > > > > technical go-to person for questions on the
> OperatorCoordinator,
> > > but
> > > > I
> > > > > > > would expect that we could offer those interfaces from Flink
> but
> > > > > > shouldn't
> > > > > > > recommend running user-code in the JobManager itself. I think
> the
> > > > user
> > > > > > code
> > > > > > > (like a webserver) should run outside of Flink (like via a
> > sidecar)
> > > > and
> > > > > > use
> > > > > > > only the provided interfaces to communicate.
> > > > > > >
> > > > > > > I would like to get @David Morávek <d...@apache.org> opinion
> on
> > > the
> > > > > > > technical part.
> > > > > > >
> > > > > > > Best regards,
> > > > > > >
> > > > > > > Martijn
> > > > > > >
> > > > > > > On Thu, 30 Dec 2021 at 10:07, Nicholas Jiang <
> > > > nicholasji...@apache.org
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi Konstantin, Becket, Martijn,
> > > > > > >>
> > > > > > >> Thanks for sharing your feedback. What other concerns do you
> > have
> > > > > about
> > > > > > >> OperatorCoodinator? If an agreement is reached on
> > > > OperatorCoodinator,
> > > > > I
> > > > > > >> will start the voting thread.
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Nicholas Jiang
> > > > > > >>
> > > > > > >> On 2021/12/22 03:19:58 Becket Qin wrote:
> > > > > > >> > Hi Konstantin,
> > > > > > >> >
> > > > > > >> > Thanks for sharing your thoughts. Please see the reply
> inline
> > > > below.
> > > > > > >> >
> > > > > > >> > Thanks,
> > > > > > >> >
> > > > > > >> > Jiangjie (Becket) Qin
> > > > > > >> >
> > > > > > >> > On Tue, Dec 21, 2021 at 7:14 PM Konstantin Knauf <
> > > > kna...@apache.org
> > > > > >
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Hi Becket, Hi Nicholas,
> > > > > > >> > >
> > > > > > >> > > Thanks for joining the discussion.
> > > > > > >> > >
> > > > > > >> > > 1 ) Personally, I would argue that we should only run user
> > > code
> > > > in
> > > > > > the
> > > > > > >> > > Jobmanager/Jobmaster if we can not avoid it. It seems
> wrong
> > to
> > > > me
> > > > > to
> > > > > > >> > > encourage users to e.g. run a webserver on the Jobmanager,
> > or
> > > > > > >> continuously
> > > > > > >> > > read patterns from a Kafka Topic on the Jobmanager, but
> both
> > > of
> > > > > > these
> > > > > > >> I see
> > > > > > >> > > happening with the current design. We've had lots of
> issues
> > > with
> > > > > > >> > > classloading leaks and other stability issues on the
> > > Jobmanager
> > > > > and
> > > > > > >> making
> > > > > > >> > > this more complicated, if there is another way, seems
> > > > unnecessary.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I think the key question here is what primitive does Flink
> > > provide
> > > > > to
> > > > > > >> > facilitate the user implementation of their own control
> logic
> > /
> > > > > > control
> > > > > > >> > plane? It looks that previously, Flink assumes that all the
> > user
> > > > > logic
> > > > > > >> is
> > > > > > >> > just data processing logic without any control /
> coordination
> > > > > > >> requirements.
> > > > > > >> > However, it turns out that a decent control plane
> abstraction
> > is
> > > > > > >> required
> > > > > > >> > in association with the data processing logic in many cases,
> > > > > including
> > > > > > >> > Source / Sink and other user defined operators in general.
> The
> > > > fact
> > > > > > >> that we
> > > > > > >> > ended up with adding the SplitEnumerator and GlobalCommitter
> > are
> > > > > just
> > > > > > >> two
> > > > > > >> > examples of the demand of such coordination among user
> defined
> > > > > logics.
> > > > > > >> > There are other cases that we see in ecosystem projects,
> such
> > as
> > > > > > >> > deep-learning-on-flink[1]. Now we see this again in CEP.
> > > > > > >> >
> > > > > > >> > Such control plane primitives are critical to the
> > extensibility
> > > > of a
> > > > > > >> > project. If we look at other projects, exposing such control
> > > plane
> > > > > > >> logic is
> > > > > > >> > quite common. For example, Hadoop ended up with exposing
> YARN
> > > as a
> > > > > > >> public
> > > > > > >> > API to the users, which is extremely popular. Kafka
> consumers
> > > > > exposed
> > > > > > >> the
> > > > > > >> > consumer group rebalance logic to the users via
> > > > > > >> ConsumerPartitionAssigner,
> > > > > > >> > which is also a control plane primitive.
> > > > > > >> >
> > > > > > >> > To me it is more important to think about how we can improve
> > the
> > > > > > >> stability
> > > > > > >> > of such a control plane mechanism, instead of simply saying
> no
> > > to
> > > > > the
> > > > > > >> users.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > 2) In addition, I suspect that, over time we will have to
> > > > > implement
> > > > > > >> all the
> > > > > > >> > > functionality that regular sources already provide around
> > > > > > consistency
> > > > > > >> > > (watermarks, checkpointing) for the
> > > PatternProcessorCoordinator,
> > > > > > too.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I think OperatorCoordinator should have a generic
> > communication
> > > > > > >> mechanism
> > > > > > >> > for all the operators, not specific to Source. We should
> > > probably
> > > > > have
> > > > > > >> an
> > > > > > >> > AbstractOperatorCoordinator help dealing with the
> > communication
> > > > > layer,
> > > > > > >> and
> > > > > > >> > leave the state maintenance and event handling logic to the
> > user
> > > > > code.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > 3) I understand that running on the Jobmanager is easier
> if
> > > you
> > > > > want
> > > > > > >> to
> > > > > > >> > > launch a REST server directly. Here my question would be:
> > does
> > > > > this
> > > > > > >> really
> > > > > > >> > > need to be solved inside of Flink or couldn't you start a
> > > > > webserver
> > > > > > >> next to
> > > > > > >> > > Flink? If we start using the Jobmanager as a REST server
> > users
> > > > > will
> > > > > > >> expect
> > > > > > >> > > that e.g. it is highly available and can be load balanced
> > and
> > > we
> > > > > > >> quickly
> > > > > > >> > > need to think about aspects that we never wanted to think
> > > about
> > > > in
> > > > > > the
> > > > > > >> > > context of a Flink Jobmanager.
> > > > > > >> > >
> > > > > > >> >
> > > > > > >> > I think the REST API is just for receiving commands
> targeting
> > a
> > > > > > running
> > > > > > >> > Flink job. If the job fails, the REST API would be useless.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > So, can you elaborate a bit more, why a
> side-input/broadcast
> > > > > stream
> > > > > > is
> > > > > > >> > >
> > > > > > >> > > a) more difficult
> > > > > > >> > > b) has vague semantics (To me semantics of a stream-stream
> > > seem
> > > > > > >> clearer
> > > > > > >> > > when it comes to out-of-orderness, late data, reprocessing
> > or
> > > > > batch
> > > > > > >> > > execution mode.)
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > I do agree that having the user defined control logic
> defined
> > in
> > > > the
> > > > > > JM
> > > > > > >> > increases the chance of instability. In that case, we may
> > think
> > > of
> > > > > > other
> > > > > > >> > solutions and I am fully open to that. But the side-input /
> > > > > broadcast
> > > > > > >> > stream seems more like a bandaid instead of a carefully
> > designed
> > > > > > control
> > > > > > >> > plane mechanism.
> > > > > > >> >
> > > > > > >> > A decent control plane requires two-way communication, so
> > > > > information
> > > > > > >> can
> > > > > > >> > be reported / collected from the entity being controlled,
> and
> > > the
> > > > > > >> > coordinator / controller can send decisions or commands to
> the
> > > > > > entities
> > > > > > >> > accordingly, just like our TM / JM communication. IIUC, this
> > is
> > > > not
> > > > > > >> > achievable with the existing side-input / broadcast stream
> as
> > > both
> > > > > of
> > > > > > >> them
> > > > > > >> > are one-way communication mechanisms. For instance, the
> > example
> > > I
> > > > > gave
> > > > > > >> in
> > > > > > >> > my previous email seems not easily achievable with
> side-input
> > /
> > > > > > >> broadcast
> > > > > > >> > streams: a single invalid pattern detected on a TM can be
> > > disabled
> > > > > > >> > elegantly globally without crashing the entire Flink job.
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > > Cheers,
> > > > > > >> > >
> > > > > > >> > > Konstantin
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Tue, Dec 21, 2021 at 11:38 AM Becket Qin <
> > > > becket....@gmail.com
> > > > > >
> > > > > > >> wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi folks,
> > > > > > >> > > >
> > > > > > >> > > > I just finished reading the previous emails. It was a
> good
> > > > > > >> discussion.
> > > > > > >> > > Here
> > > > > > >> > > > are my two cents:
> > > > > > >> > > >
> > > > > > >> > > > *Is OperatorCoordinator a public API?*
> > > > > > >> > > > Regarding the OperatorCoordinator, although it is marked
> > as
> > > > > > internal
> > > > > > >> > > > interface at this point, I intended to make it a public
> > > > > interface
> > > > > > >> when
> > > > > > >> > > add
> > > > > > >> > > > that in FLIP-27. This is a powerful cross-subtask
> > > > communication
> > > > > > >> mechanism
> > > > > > >> > > > that enables many use cases, Source / Sink / TF on
> Flink /
> > > CEP
> > > > > > here
> > > > > > >> > > again.
> > > > > > >> > > > To my understanding, OC was marked as internal because
> we
> > > > think
> > > > > it
> > > > > > >> is not
> > > > > > >> > > > stable enough yet. We may need to fortify the
> > OperatorEvent
> > > > > > delivery
> > > > > > >> > > > semantic a little bit so it works well with checkpoint
> in
> > > > > general.
> > > > > > >> > > >
> > > > > > >> > > > I think it is a valid concern that user code running in
> JM
> > > may
> > > > > > cause
> > > > > > >> > > > instability. However, not providing this communication
> > > > mechanism
> > > > > > >> only
> > > > > > >> > > makes
> > > > > > >> > > > a lot of use case even harder to implement. So it seems
> > that
> > > > > > having
> > > > > > >> the
> > > > > > >> > > OC
> > > > > > >> > > > exposed to end users brings more benefits than harm to
> > > Flink.
> > > > At
> > > > > > >> the end
> > > > > > >> > > of
> > > > > > >> > > > the day, users have many ways to crash a Flink job if
> they
> > > do
> > > > > not
> > > > > > >> write
> > > > > > >> > > > proper code. So making those who know what they do happy
> > > seems
> > > > > > more
> > > > > > >> > > > important here.
> > > > > > >> > > >
> > > > > > >> > > > *OperatorCoordinator V.S. side-input / broadcast stream*
> > > > > > >> > > > I think both of them can achieve the goal of dynamic
> > > patterns.
> > > > > The
> > > > > > >> main
> > > > > > >> > > > difference is the extensibility.
> > > > > > >> > > >
> > > > > > >> > > > OC is a 2-way communication mechanism, i.e. a subtask
> can
> > > also
> > > > > > send
> > > > > > >> > > > OperatorEvent to the coordinator to report its owns
> > status,
> > > so
> > > > > > that
> > > > > > >> the
> > > > > > >> > > > coordinator can act accordingly. This is sometimes
> useful.
> > > For
> > > > > > >> example, a
> > > > > > >> > > > single invalid pattern can be disabled elegantly without
> > > > > crashing
> > > > > > >> the
> > > > > > >> > > > entire Flink job. In the future, if we allow users to
> send
> > > > > > external
> > > > > > >> > > command
> > > > > > >> > > > to OC via JM, a default self-contained implementation
> can
> > > just
> > > > > > >> update the
> > > > > > >> > > > pattern via the REST API without external dependencies.
> > > > > > >> > > >
> > > > > > >> > > > Another reason I personally prefer OC is because it is
> an
> > > > > explicit
> > > > > > >> > > control
> > > > > > >> > > > plain mechanism, where as the side-input / broadcast
> > stream
> > > > has
> > > > > > are
> > > > > > >> more
> > > > > > >> > > > vague semantic.
> > > > > > >> > > >
> > > > > > >> > > > Thanks,
> > > > > > >> > > >
> > > > > > >> > > > Jiangjie (Becket) Qin
> > > > > > >> > > >
> > > > > > >> > > > On Tue, Dec 21, 2021 at 4:25 PM David Morávek <
> > > > d...@apache.org>
> > > > > > >> wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hi Yunfeng,
> > > > > > >> > > > >
> > > > > > >> > > > > thanks for drafting this FLIP, this will be a great
> > > addition
> > > > > > into
> > > > > > >> the
> > > > > > >> > > CEP
> > > > > > >> > > > > toolbox!
> > > > > > >> > > > >
> > > > > > >> > > > > Apart from running user code in JM, which want to
> avoid
> > in
> > > > > > >> general, I'd
> > > > > > >> > > > > have one more another concern about using the
> > > > > > OperatorCoordinator
> > > > > > >> and
> > > > > > >> > > > that
> > > > > > >> > > > > is re-processing of the historical data. Any thoughts
> > > about
> > > > > how
> > > > > > >> this
> > > > > > >> > > will
> > > > > > >> > > > > work with the OC?
> > > > > > >> > > > >
> > > > > > >> > > > > I have a slight feeling that a side-input (custom
> > source /
> > > > > > >> operator +
> > > > > > >> > > > > broadcast) would a better fit for this case. This
> would
> > > > > simplify
> > > > > > >> the
> > > > > > >> > > > > consistency concerns (watermarks + pushback) and the
> > > > > > >> re-processing of
> > > > > > >> > > > > historical data.
> > > > > > >> > > > >
> > > > > > >> > > > > Best,
> > > > > > >> > > > > D.
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > > On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang <
> > > > > > >> > > nicholasji...@apache.org
> > > > > > >> > > > >
> > > > > > >> > > > > wrote:
> > > > > > >> > > > >
> > > > > > >> > > > > > Hi Konstantin, Martijn
> > > > > > >> > > > > >
> > > > > > >> > > > > > Thanks for the detailed feedback in the discussion.
> > > What I
> > > > > > >> still have
> > > > > > >> > > > > left
> > > > > > >> > > > > > to answer/reply to:
> > > > > > >> > > > > >
> > > > > > >> > > > > > -- Martijn: Just to be sure, this indeed would mean
> > that
> > > > if
> > > > > > for
> > > > > > >> > > > whatever
> > > > > > >> > > > > > reason the heartbeat timeout, it would crash the
> job,
> > > > right?
> > > > > > >> > > > > >
> > > > > > >> > > > > > IMO, if for whatever reason the heartbeat timeout,
> it
> > > > > couldn't
> > > > > > >> check
> > > > > > >> > > > the
> > > > > > >> > > > > > PatternProcessor consistency between the
> > > > OperatorCoordinator
> > > > > > >> and the
> > > > > > >> > > > > > subtasks so that the job would be crashed.
> > > > > > >> > > > > >
> > > > > > >> > > > > > -- Konstantin: What I was concerned about is that we
> > > > > basically
> > > > > > >> let
> > > > > > >> > > > users
> > > > > > >> > > > > > run a UserFunction in the OperatorCoordinator, which
> > it
> > > > does
> > > > > > >> not seem
> > > > > > >> > > > to
> > > > > > >> > > > > > have been designed for.
> > > > > > >> > > > > >
> > > > > > >> > > > > > In general, we have reached an agreement on the
> design
> > > of
> > > > > this
> > > > > > >> FLIP,
> > > > > > >> > > > but
> > > > > > >> > > > > > there are some concerns on the OperatorCoordinator,
> > > about
> > > > > > >> whether
> > > > > > >> > > > > basically
> > > > > > >> > > > > > let users run a UserFunction in the
> > OperatorCoordinator
> > > is
> > > > > > >> designed
> > > > > > >> > > for
> > > > > > >> > > > > > OperatorCoordinator. We would like to invite Becket
> > Qin
> > > > who
> > > > > is
> > > > > > >> the
> > > > > > >> > > > author
> > > > > > >> > > > > > of OperatorCoordinator to help us to answer this
> > > concern.
> > > > > > >> > > > > >
> > > > > > >> > > > > > Best,
> > > > > > >> > > > > > Nicholas Jiang
> > > > > > >> > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > > > On 2021/12/20 10:07:14 Martijn Visser wrote:
> > > > > > >> > > > > > > Hi all,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Really like the discussion on this topic moving
> > > > forward. I
> > > > > > >> really
> > > > > > >> > > > think
> > > > > > >> > > > > > > this feature will be much appreciated by the Flink
> > > > users.
> > > > > > >> What I
> > > > > > >> > > > still
> > > > > > >> > > > > > have
> > > > > > >> > > > > > > left to answer/reply to:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > -- Good point. If for whatever reason the
> different
> > > > > > >> taskmanagers
> > > > > > >> > > > can't
> > > > > > >> > > > > > get
> > > > > > >> > > > > > > the latest rule, the Operator Coordinator could
> > send a
> > > > > > >> heartbeat to
> > > > > > >> > > > all
> > > > > > >> > > > > > > taskmanagers with the latest rules and check the
> > > > heartbeat
> > > > > > >> response
> > > > > > >> > > > > from
> > > > > > >> > > > > > > all the taskmanagers whether the latest rules of
> the
> > > > > > >> taskmanager is
> > > > > > >> > > > > equal
> > > > > > >> > > > > > > to these of the Operator Coordinator.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Just to be sure, this indeed would mean that if
> for
> > > > > whatever
> > > > > > >> reason
> > > > > > >> > > > the
> > > > > > >> > > > > > > heartbeat timeout, it would crash the job, right?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > -- We have consided about the solution mentioned
> > > above.
> > > > In
> > > > > > >> this
> > > > > > >> > > > > > solution, I
> > > > > > >> > > > > > > have some questions about how to guarantee the
> > > > consistency
> > > > > > of
> > > > > > >> the
> > > > > > >> > > > rule
> > > > > > >> > > > > > > between each TaskManager. By having a coodinator
> in
> > > the
> > > > > > >> JobManager
> > > > > > >> > > to
> > > > > > >> > > > > > > centrally manage the latest rules, the latest
> rules
> > of
> > > > all
> > > > > > >> > > > TaskManagers
> > > > > > >> > > > > > are
> > > > > > >> > > > > > > consistent with those of the JobManager, so as to
> > > avoid
> > > > > the
> > > > > > >> > > > > > inconsistencies
> > > > > > >> > > > > > > that may be encountered in the above solution. Can
> > you
> > > > > > >> introduce
> > > > > > >> > > how
> > > > > > >> > > > > this
> > > > > > >> > > > > > > solution guarantees the consistency of the rules?
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > The consistency that we could guarantee was based
> on
> > > how
> > > > > > >> often each
> > > > > > >> > > > > > > TaskManager would do a refresh and how often we
> > would
> > > > > > accept a
> > > > > > >> > > > refresh
> > > > > > >> > > > > to
> > > > > > >> > > > > > > fail. We set the refresh time to a relatively
> short
> > > one
> > > > > (30
> > > > > > >> > > seconds)
> > > > > > >> > > > > and
> > > > > > >> > > > > > > maximum failures to 3. That meant that we could
> > > > guarantee
> > > > > > that
> > > > > > >> > > rules
> > > > > > >> > > > > > would
> > > > > > >> > > > > > > be updated in < 2 minutes or else the job would
> > crash.
> > > > > That
> > > > > > >> was
> > > > > > >> > > > > > sufficient
> > > > > > >> > > > > > > for our use cases. This also really depends on how
> > big
> > > > > your
> > > > > > >> cluster
> > > > > > >> > > > > is. I
> > > > > > >> > > > > > > can imagine that if you have a large scale cluster
> > > that
> > > > > you
> > > > > > >> want to
> > > > > > >> > > > > run,
> > > > > > >> > > > > > > you don't want to DDOS the backend system where
> you
> > > have
> > > > > > your
> > > > > > >> rules
> > > > > > >> > > > > > stored.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > -- In summary, the current design is that
> JobManager
> > > > tells
> > > > > > all
> > > > > > >> > > > > > TaskManagers
> > > > > > >> > > > > > > the latest rules through OperatorCoodinator, and
> > will
> > > > > > >> initiate a
> > > > > > >> > > > > > heartbeat
> > > > > > >> > > > > > > to check whether the latest rules on each
> > TaskManager
> > > > are
> > > > > > >> > > consistent.
> > > > > > >> > > > > We
> > > > > > >> > > > > > > will describe how to deal with the Failover
> scenario
> > > in
> > > > > more
> > > > > > >> detail
> > > > > > >> > > > on
> > > > > > >> > > > > > FLIP.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Thanks for that. I think having the JobManager
> tell
> > > the
> > > > > > >> > > TaskManagers
> > > > > > >> > > > > the
> > > > > > >> > > > > > > applicable rules would indeed end up being the
> best
> > > > > design.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > -- about the concerns around consistency raised by
> > > > > Martijn:
> > > > > > I
> > > > > > >> > > think a
> > > > > > >> > > > > lot
> > > > > > >> > > > > > > of those can be mitigated by using an event time
> > > > timestamp
> > > > > > >> from
> > > > > > >> > > which
> > > > > > >> > > > > the
> > > > > > >> > > > > > > rules take effect. The reprocessing scenario, for
> > > > example,
> > > > > > is
> > > > > > >> > > covered
> > > > > > >> > > > > by
> > > > > > >> > > > > > > this. If a pattern processor should become active
> as
> > > > soon
> > > > > as
> > > > > > >> > > > possible,
> > > > > > >> > > > > > > there will still be inconsistencies between
> > > > Taskmanagers,
> > > > > > but
> > > > > > >> "as
> > > > > > >> > > > soon
> > > > > > >> > > > > as
> > > > > > >> > > > > > > possible" is vague anyway, which is why I think
> > that's
> > > > ok.
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > I think an event timestamp is indeed a really
> > > important
> > > > > one.
> > > > > > >> We
> > > > > > >> > > also
> > > > > > >> > > > > used
> > > > > > >> > > > > > > that in my previous role, with the
> > > > ruleActivationTimestamp
> > > > > > >> compared
> > > > > > >> > > > to
> > > > > > >> > > > > > > eventtime (well, actually we used Kafka logAppend
> > time
> > > > > > because
> > > > > > >> > > > > > > eventtime wasn't always properly set so we used
> that
> > > > time
> > > > > to
> > > > > > >> > > > overwrite
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > eventtime from the event itself).
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Best regards,
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > Martijn
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf <
> > > > > > >> kna...@apache.org>
> > > > > > >> > > > > > wrote:
> > > > > > >> > > > > > >
> > > > > > >> > > > > > > > Hi Nicholas, Hi Junfeng,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > about the concerns around consistency raised by
> > > > > Martijn: I
> > > > > > >> think
> > > > > > >> > > a
> > > > > > >> > > > > lot
> > > > > > >> > > > > > of
> > > > > > >> > > > > > > > those can be mitigated by using an event time
> > > > timestamp
> > > > > > from
> > > > > > >> > > which
> > > > > > >> > > > > the
> > > > > > >> > > > > > > > rules take effect. The reprocessing scenario,
> for
> > > > > example,
> > > > > > >> is
> > > > > > >> > > > covered
> > > > > > >> > > > > > by
> > > > > > >> > > > > > > > this. If a pattern processor should become
> active
> > as
> > > > > soon
> > > > > > as
> > > > > > >> > > > > possible,
> > > > > > >> > > > > > > > there will still be inconsistencies between
> > > > > Taskmanagers,
> > > > > > >> but "as
> > > > > > >> > > > > soon
> > > > > > >> > > > > > as
> > > > > > >> > > > > > > > possible" is vague anyway, which is why I think
> > > that's
> > > > > ok.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > about naming: The naming with "PatternProcessor"
> > > > sounds
> > > > > > >> good to
> > > > > > >> > > me.
> > > > > > >> > > > > > Final
> > > > > > >> > > > > > > > nit: I would go for CEP#patternProccessors,
> which
> > > > would
> > > > > be
> > > > > > >> > > > consistent
> > > > > > >> > > > > > with
> > > > > > >> > > > > > > > CEP#pattern.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I am not sure about one of the rejected
> > > alternatives:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > Have each subtask of an operator make the
> update
> > > on
> > > > > > their
> > > > > > >> own
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >    -
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >    It is hard to achieve consistency.
> > > > > > >> > > > > > > >    -
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >       Though the time interval that each subtask
> > > makes
> > > > > the
> > > > > > >> update
> > > > > > >> > > > can
> > > > > > >> > > > > > be
> > > > > > >> > > > > > > >       the same, the absolute time they make the
> > > update
> > > > > > >> might be
> > > > > > >> > > > > > different.
> > > > > > >> > > > > > > > For
> > > > > > >> > > > > > > >       example, one makes updates at 10:00,
> 10:05,
> > > etc,
> > > > > > while
> > > > > > >> > > > another
> > > > > > >> > > > > > does
> > > > > > >> > > > > > > > it at
> > > > > > >> > > > > > > >       10:01, 10:06. In this case the subtasks
> > might
> > > > > never
> > > > > > >> > > > processing
> > > > > > >> > > > > > data
> > > > > > >> > > > > > > > with
> > > > > > >> > > > > > > >       the same set of pattern processors.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > I would have thought that it is quite easy to
> poll
> > > for
> > > > > the
> > > > > > >> rules
> > > > > > >> > > > from
> > > > > > >> > > > > > each
> > > > > > >> > > > > > > > Subtask at *about *the same time. So, this alone
> > > does
> > > > > not
> > > > > > >> seem to
> > > > > > >> > > > be
> > > > > > >> > > > > > > > enough to rule out this option. I've looped in
> > David
> > > > > > >> Moravek to
> > > > > > >> > > get
> > > > > > >> > > > > his
> > > > > > >> > > > > > > > opinion of the additional load imposed on the
> JM.
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Thanks,
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Konstantin
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang <
> > > > > > >> > > > > > nicholasji...@apache.org>
> > > > > > >> > > > > > > > wrote:
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > > Hi Yue,
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Thanks for your feedback of the FLIP. I have
> > > > addressed
> > > > > > >> your
> > > > > > >> > > > > > questions and
> > > > > > >> > > > > > > > > made a corresponding explanation as follows:
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > -- About Pattern Updating. If we use
> > > > > > >> > > PatternProcessoerDiscoverer
> > > > > > >> > > > to
> > > > > > >> > > > > > > > update
> > > > > > >> > > > > > > > > the rules, will it increase the load of JM?
> For
> > > > > example,
> > > > > > >> if the
> > > > > > >> > > > > user
> > > > > > >> > > > > > > > wants
> > > > > > >> > > > > > > > > the updated rule to take effect immediately,,
> > > which
> > > > > > means
> > > > > > >> that
> > > > > > >> > > we
> > > > > > >> > > > > > need to
> > > > > > >> > > > > > > > > set a shorter check interval or there is
> another
> > > > > > scenario
> > > > > > >> when
> > > > > > >> > > > > users
> > > > > > >> > > > > > > > rarely
> > > > > > >> > > > > > > > > update the pattern, will the
> > > > > PatternProcessoerDiscoverer
> > > > > > >> be in
> > > > > > >> > > > most
> > > > > > >> > > > > > of
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > time Do useless checks ? Will a lazy update
> mode
> > > > could
> > > > > > be
> > > > > > >> used,
> > > > > > >> > > > > > which the
> > > > > > >> > > > > > > > > pattern only be updated when triggered by the
> > > user,
> > > > > and
> > > > > > do
> > > > > > >> > > > nothing
> > > > > > >> > > > > at
> > > > > > >> > > > > > > > other
> > > > > > >> > > > > > > > > times?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > PatternProcessoerDiscoverer is a user-defined
> > > > > interface
> > > > > > to
> > > > > > >> > > > discover
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > PatternProcessor updates. Periodically
> checking
> > > the
> > > > > > >> > > > > PatternProcessor
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > the
> > > > > > >> > > > > > > > > database is a implementation of the
> > > > > > >> PatternProcessoerDiscoverer
> > > > > > >> > > > > > > > interface,
> > > > > > >> > > > > > > > > which is that periodically querys all the
> > > > > > PatternProcessor
> > > > > > >> > > table
> > > > > > >> > > > in
> > > > > > >> > > > > > > > certain
> > > > > > >> > > > > > > > > interval. This implementation indeeds has the
> > > > useless
> > > > > > >> checks,
> > > > > > >> > > and
> > > > > > >> > > > > > could
> > > > > > >> > > > > > > > > directly integrates the changelog of the
> table.
> > In
> > > > > > >> addition, in
> > > > > > >> > > > > > addition
> > > > > > >> > > > > > > > to
> > > > > > >> > > > > > > > > the implementation of periodically checking
> the
> > > > > > database,
> > > > > > >> there
> > > > > > >> > > > are
> > > > > > >> > > > > > other
> > > > > > >> > > > > > > > > implementations such as the PatternProcessor
> > that
> > > > > > provides
> > > > > > >> > > > Restful
> > > > > > >> > > > > > > > services
> > > > > > >> > > > > > > > > to receive updates.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > --  I still have some confusion about how Key
> > > > > Generating
> > > > > > >> > > > Opertator
> > > > > > >> > > > > > and
> > > > > > >> > > > > > > > > CepOperator (Pattern Matching & Processing
> > > Operator)
> > > > > > work
> > > > > > >> > > > together.
> > > > > > >> > > > > > If
> > > > > > >> > > > > > > > > there are N PatternProcessors, will the Key
> > > > Generating
> > > > > > >> > > Opertator
> > > > > > >> > > > > > > > generate N
> > > > > > >> > > > > > > > > keyedStreams, and then N CepOperator would
> > process
> > > > > each
> > > > > > >> Key
> > > > > > >> > > > > > separately ?
> > > > > > >> > > > > > > > Or
> > > > > > >> > > > > > > > > every CepOperator Task would process all
> > patterns,
> > > > if
> > > > > > so,
> > > > > > >> does
> > > > > > >> > > > the
> > > > > > >> > > > > > key
> > > > > > >> > > > > > > > type
> > > > > > >> > > > > > > > > in each PatternProcessor need to be the same?
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Firstly the Pattern Matching & Processing
> > Operator
> > > > is
> > > > > > not
> > > > > > >> the
> > > > > > >> > > > > > CepOperator
> > > > > > >> > > > > > > > > at present, because CepOperator mechanism is
> > based
> > > > on
> > > > > > the
> > > > > > >> > > > NFAState.
> > > > > > >> > > > > > > > > Secondly if there are N PatternProcessors, the
> > Key
> > > > > > >> Generating
> > > > > > >> > > > > > Opertator
> > > > > > >> > > > > > > > > combines all the keyedStreams with keyBy()
> > > > operation,
> > > > > > >> thus the
> > > > > > >> > > > > > Pattern
> > > > > > >> > > > > > > > > Matching & Processing Operator would process
> all
> > > the
> > > > > > >> patterns.
> > > > > > >> > > In
> > > > > > >> > > > > > other
> > > > > > >> > > > > > > > > words, the KeySelector of the PatternProcessor
> > is
> > > > used
> > > > > > >> for the
> > > > > > >> > > > Key
> > > > > > >> > > > > > > > > Generating Opertator, and the Pattern and
> > > > > > >> > > PatternProceessFunction
> > > > > > >> > > > > of
> > > > > > >> > > > > > the
> > > > > > >> > > > > > > > > PatternProcessor are used for the Pattern
> > > Matching &
> > > > > > >> Processing
> > > > > > >> > > > > > Operator.
> > > > > > >> > > > > > > > > Lastly the key type in each PatternProcessor
> is
> > > the
> > > > > > same,
> > > > > > >> > > > regarded
> > > > > > >> > > > > as
> > > > > > >> > > > > > > > > Object type.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > -- Maybe need to pay attention to it when
> > > > implementing
> > > > > > it
> > > > > > >> .If
> > > > > > >> > > > some
> > > > > > >> > > > > > > > Pattern
> > > > > > >> > > > > > > > > has been removed or updated, will the
> partially
> > > > > matched
> > > > > > >> results
> > > > > > >> > > > in
> > > > > > >> > > > > > > > > StateBackend would be clean up or We rely on
> > state
> > > > ttl
> > > > > > to
> > > > > > >> clean
> > > > > > >> > > > up
> > > > > > >> > > > > > these
> > > > > > >> > > > > > > > > expired states.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > If certain Pattern has been removed or
> updated,
> > > the
> > > > > > >> partially
> > > > > > >> > > > > matched
> > > > > > >> > > > > > > > > results in StateBackend would be clean up
> until
> > > the
> > > > > next
> > > > > > >> > > > > checkpoint.
> > > > > > >> > > > > > The
> > > > > > >> > > > > > > > > partially matched result doesn't depend on the
> > > state
> > > > > ttl
> > > > > > >> of the
> > > > > > >> > > > > > > > > StateBackend.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > 4. Will the PatternProcessorManager keep all
> the
> > > > > active
> > > > > > >> > > > > > PatternProcessor
> > > > > > >> > > > > > > > > in memory? We have also Support Multiple Rule
> > and
> > > > > > Dynamic
> > > > > > >> Rule
> > > > > > >> > > > > > Changing.
> > > > > > >> > > > > > > > > But we are facing such a problem, some users’
> > > usage
> > > > > > >> scenarios
> > > > > > >> > > are
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > > they
> > > > > > >> > > > > > > > > want to have their own pattern for each
> user_id,
> > > > which
> > > > > > >> means
> > > > > > >> > > that
> > > > > > >> > > > > > there
> > > > > > >> > > > > > > > > could be thousands of patterns, which would
> make
> > > the
> > > > > > >> > > performance
> > > > > > >> > > > of
> > > > > > >> > > > > > > > Pattern
> > > > > > >> > > > > > > > > Matching very poor. We are also trying to
> solve
> > > this
> > > > > > >> problem.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > The PatternProcessorManager keeps all the
> active
> > > > > > >> > > PatternProcessor
> > > > > > >> > > > > in
> > > > > > >> > > > > > > > > memory. For scenarios that they want to have
> > their
> > > > own
> > > > > > >> pattern
> > > > > > >> > > > for
> > > > > > >> > > > > > each
> > > > > > >> > > > > > > > > user_id, IMO, is it possible to reduce the
> > > > > fine-grained
> > > > > > >> pattern
> > > > > > >> > > > of
> > > > > > >> > > > > > > > > PatternProcessor to solve the performance
> > problem
> > > of
> > > > > the
> > > > > > >> > > Pattern
> > > > > > >> > > > > > > > Matching,
> > > > > > >> > > > > > > > > for example, a pattern corresponds to a group
> of
> > > > > users?
> > > > > > >> The
> > > > > > >> > > > > scenarios
> > > > > > >> > > > > > > > > mentioned above need to be solved by case by
> > case.
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > Best,
> > > > > > >> > > > > > > > > Nicholas Jiang
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > > > On 2021/12/17 11:43:10 yue ma wrote:
> > > > > > >> > > > > > > > > > Glad to see the Community's progress in
> Flink
> > > CEP.
> > > > > > After
> > > > > > >> > > > reading
> > > > > > >> > > > > > this
> > > > > > >> > > > > > > > > Flip,
> > > > > > >> > > > > > > > > > I have few questions, would you please take
> a
> > > look
> > > > > ?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 1. About Pattern Updating. If we use
> > > > > > >> > > > PatternProcessoerDiscoverer
> > > > > > >> > > > > to
> > > > > > >> > > > > > > > > update
> > > > > > >> > > > > > > > > > the rules, will it increase the load of JM?
> > For
> > > > > > >> example, if
> > > > > > >> > > the
> > > > > > >> > > > > > user
> > > > > > >> > > > > > > > > wants
> > > > > > >> > > > > > > > > > the updated rule to take effect
> immediately,,
> > > > which
> > > > > > >> means
> > > > > > >> > > that
> > > > > > >> > > > we
> > > > > > >> > > > > > need
> > > > > > >> > > > > > > > to
> > > > > > >> > > > > > > > > > set a shorter check interval  or there is
> > > another
> > > > > > >> scenario
> > > > > > >> > > when
> > > > > > >> > > > > > users
> > > > > > >> > > > > > > > > > rarely update the pattern, will the
> > > > > > >> > > PatternProcessoerDiscoverer
> > > > > > >> > > > > be
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > > most
> > > > > > >> > > > > > > > > > of the time Do useless checks ? Will a lazy
> > > update
> > > > > > mode
> > > > > > >> could
> > > > > > >> > > > be
> > > > > > >> > > > > > used,
> > > > > > >> > > > > > > > > > which the pattern only be updated when
> > triggered
> > > > by
> > > > > > the
> > > > > > >> user,
> > > > > > >> > > > and
> > > > > > >> > > > > > do
> > > > > > >> > > > > > > > > > nothing at other times ?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 2.   I still have some confusion about how
> Key
> > > > > > >> Generating
> > > > > > >> > > > > > Opertator and
> > > > > > >> > > > > > > > > > CepOperator (Pattern Matching & Processing
> > > > Operator)
> > > > > > >> work
> > > > > > >> > > > > > together. If
> > > > > > >> > > > > > > > > > there are N PatternProcessors, will the Key
> > > > > Generating
> > > > > > >> > > > Opertator
> > > > > > >> > > > > > > > > generate N
> > > > > > >> > > > > > > > > > keyedStreams, and then N CepOperator would
> > > process
> > > > > > each
> > > > > > >> Key
> > > > > > >> > > > > > separately
> > > > > > >> > > > > > > > ?
> > > > > > >> > > > > > > > > Or
> > > > > > >> > > > > > > > > > every CepOperator Task would process all
> > > patterns,
> > > > > if
> > > > > > >> so,
> > > > > > >> > > does
> > > > > > >> > > > > the
> > > > > > >> > > > > > key
> > > > > > >> > > > > > > > > type
> > > > > > >> > > > > > > > > > in each PatternProcessor need to be the
> same ?
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 3. Maybe need to pay attention to it when
> > > > > implementing
> > > > > > >> it .If
> > > > > > >> > > > > some
> > > > > > >> > > > > > > > > Pattern
> > > > > > >> > > > > > > > > > has been removed or updateed  ,will the
> > > partially
> > > > > > >> matched
> > > > > > >> > > > results
> > > > > > >> > > > > > in
> > > > > > >> > > > > > > > > > StateBackend would be clean up or We rely on
> > > state
> > > > > ttl
> > > > > > >> to
> > > > > > >> > > clean
> > > > > > >> > > > > up
> > > > > > >> > > > > > > > these
> > > > > > >> > > > > > > > > > expired states.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > 4. Will the PatternProcessorManager keep all
> > the
> > > > > > active
> > > > > > >> > > > > > > > PatternProcessor
> > > > > > >> > > > > > > > > in
> > > > > > >> > > > > > > > > > memory ? We have also Support Multiple Rule
> > and
> > > > > > Dynamic
> > > > > > >> Rule
> > > > > > >> > > > > > Changing .
> > > > > > >> > > > > > > > > > But we are facing such a problem, some
> users’
> > > > usage
> > > > > > >> scenarios
> > > > > > >> > > > are
> > > > > > >> > > > > > that
> > > > > > >> > > > > > > > > they
> > > > > > >> > > > > > > > > > want to have their own pattern for each
> > user_id,
> > > > > which
> > > > > > >> means
> > > > > > >> > > > that
> > > > > > >> > > > > > there
> > > > > > >> > > > > > > > > > could be thousands of patterns, which would
> > make
> > > > the
> > > > > > >> > > > performance
> > > > > > >> > > > > of
> > > > > > >> > > > > > > > > Pattern
> > > > > > >> > > > > > > > > > Matching very poor. We are also trying to
> > solve
> > > > this
> > > > > > >> problem.
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com>
> > > > > > >> 于2021年12月10日周五
> > > > > > >> > > > > 19:16写道:
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > > > > Hi all,
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > I'm opening this thread to propose the
> > design
> > > to
> > > > > > >> support
> > > > > > >> > > > > multiple
> > > > > > >> > > > > > > > rule
> > > > > > >> > > > > > > > > &
> > > > > > >> > > > > > > > > > > dynamic rule changing in the Flink-CEP
> > > project,
> > > > as
> > > > > > >> > > described
> > > > > > >> > > > in
> > > > > > >> > > > > > > > > FLIP-200
> > > > > > >> > > > > > > > > > > [1]
> > > > > > >> > > > > > > > > > > .
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Currently Flink CEP only supports having a
> > > > single
> > > > > > >> pattern
> > > > > > >> > > > > inside
> > > > > > >> > > > > > a
> > > > > > >> > > > > > > > > > > CepOperator and does not support changing
> > the
> > > > > > pattern
> > > > > > >> > > > > > dynamically. In
> > > > > > >> > > > > > > > > order
> > > > > > >> > > > > > > > > > > to reduce resource consumption and to
> > > experience
> > > > > > >> shorter
> > > > > > >> > > > > downtime
> > > > > > >> > > > > > > > > during
> > > > > > >> > > > > > > > > > > pattern updates, there is a growing need
> in
> > > the
> > > > > > >> production
> > > > > > >> > > > > > > > environment
> > > > > > >> > > > > > > > > that
> > > > > > >> > > > > > > > > > > expects CEP to support having multiple
> > > patterns
> > > > in
> > > > > > one
> > > > > > >> > > > operator
> > > > > > >> > > > > > and
> > > > > > >> > > > > > > > to
> > > > > > >> > > > > > > > > > > support dynamically changing them.
> > Therefore I
> > > > > > >> propose to
> > > > > > >> > > add
> > > > > > >> > > > > > certain
> > > > > > >> > > > > > > > > > > infrastructure as described in FLIP-200 to
> > > > support
> > > > > > >> these
> > > > > > >> > > > > > > > > functionalities.
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Please feel free to reply to this email
> > > thread.
> > > > > > >> Looking
> > > > > > >> > > > forward
> > > > > > >> > > > > > to
> > > > > > >> > > > > > > > your
> > > > > > >> > > > > > > > > > > feedback!
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > [1]
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Best regards,
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > > > Yunfeng
> > > > > > >> > > > > > > > > > >
> > > > > > >> > > > > > > > > >
> > > > > > >> > > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > --
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > Konstantin Knauf
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > https://twitter.com/snntrable
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > > > https://github.com/knaufk
> > > > > > >> > > > > > > >
> > > > > > >> > > > > > >
> > > > > > >> > > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > --
> > > > > > >> > >
> > > > > > >> > > Konstantin Knauf
> > > > > > >> > >
> > > > > > >> > > https://twitter.com/snntrable
> > > > > > >> > >
> > > > > > >> > > https://github.com/knaufk
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to