Hi Yunfeng,

thanks for drafting this FLIP, this will be a great addition into the CEP
toolbox!

Apart from running user code in JM, which want to avoid in general, I'd
have one more another concern about using the OperatorCoordinator and that
is re-processing of the historical data. Any thoughts about how this will
work with the OC?

I have a slight feeling that a side-input (custom source / operator +
broadcast) would a better fit for this case. This would simplify the
consistency concerns (watermarks + pushback) and the re-processing of
historical data.

Best,
D.


On Tue, Dec 21, 2021 at 6:47 AM Nicholas Jiang <nicholasji...@apache.org>
wrote:

> Hi Konstantin, Martijn
>
> Thanks for the detailed feedback in the discussion. What I still have left
> to answer/reply to:
>
> -- Martijn: Just to be sure, this indeed would mean that if for whatever
> reason the heartbeat timeout, it would crash the job, right?
>
> IMO, if for whatever reason the heartbeat timeout, it couldn't check the
> PatternProcessor consistency between the OperatorCoordinator and the
> subtasks so that the job would be crashed.
>
> -- Konstantin: What I was concerned about is that we basically let users
> run a UserFunction in the OperatorCoordinator, which it does not seem to
> have been designed for.
>
> In general, we have reached an agreement on the design of this FLIP, but
> there are some concerns on the OperatorCoordinator, about whether basically
> let users run a UserFunction in the OperatorCoordinator is designed for
> OperatorCoordinator. We would like to invite Becket Qin who is the author
> of OperatorCoordinator to help us to answer this concern.
>
> Best,
> Nicholas Jiang
>
>
> On 2021/12/20 10:07:14 Martijn Visser wrote:
> > Hi all,
> >
> > Really like the discussion on this topic moving forward. I really think
> > this feature will be much appreciated by the Flink users. What I still
> have
> > left to answer/reply to:
> >
> > -- Good point. If for whatever reason the different taskmanagers can't
> get
> > the latest rule, the Operator Coordinator could send a heartbeat to all
> > taskmanagers with the latest rules and check the heartbeat response from
> > all the taskmanagers whether the latest rules of the taskmanager is equal
> > to these of the Operator Coordinator.
> >
> > Just to be sure, this indeed would mean that if for whatever reason the
> > heartbeat timeout, it would crash the job, right?
> >
> > -- We have consided about the solution mentioned above. In this
> solution, I
> > have some questions about how to guarantee the consistency of the rule
> > between each TaskManager. By having a coodinator in the JobManager to
> > centrally manage the latest rules, the latest rules of all TaskManagers
> are
> > consistent with those of the JobManager, so as to avoid the
> inconsistencies
> > that may be encountered in the above solution. Can you introduce how this
> > solution guarantees the consistency of the rules?
> >
> > The consistency that we could guarantee was based on how often each
> > TaskManager would do a refresh and how often we would accept a refresh to
> > fail. We set the refresh time to a relatively short one (30 seconds) and
> > maximum failures to 3. That meant that we could guarantee that rules
> would
> > be updated in < 2 minutes or else the job would crash. That was
> sufficient
> > for our use cases. This also really depends on how big your cluster is. I
> > can imagine that if you have a large scale cluster that you want to run,
> > you don't want to DDOS the backend system where you have your rules
> stored.
> >
> > -- In summary, the current design is that JobManager tells all
> TaskManagers
> > the latest rules through OperatorCoodinator, and will initiate a
> heartbeat
> > to check whether the latest rules on each TaskManager are consistent. We
> > will describe how to deal with the Failover scenario in more detail on
> FLIP.
> >
> > Thanks for that. I think having the JobManager tell the TaskManagers the
> > applicable rules would indeed end up being the best design.
> >
> > -- about the concerns around consistency raised by Martijn: I think a lot
> > of those can be mitigated by using an event time timestamp from which the
> > rules take effect. The reprocessing scenario, for example, is covered by
> > this. If a pattern processor should become active as soon as possible,
> > there will still be inconsistencies between Taskmanagers, but "as soon as
> > possible" is vague anyway, which is why I think that's ok.
> >
> > I think an event timestamp is indeed a really important one. We also used
> > that in my previous role, with the ruleActivationTimestamp compared to
> > eventtime (well, actually we used Kafka logAppend time because
> > eventtime wasn't always properly set so we used that time to overwrite
> the
> > eventtime from the event itself).
> >
> > Best regards,
> >
> > Martijn
> >
> > On Mon, 20 Dec 2021 at 09:08, Konstantin Knauf <kna...@apache.org>
> wrote:
> >
> > > Hi Nicholas, Hi Junfeng,
> > >
> > > about the concerns around consistency raised by Martijn: I think a lot
> of
> > > those can be mitigated by using an event time timestamp from which the
> > > rules take effect. The reprocessing scenario, for example, is covered
> by
> > > this. If a pattern processor should become active as soon as possible,
> > > there will still be inconsistencies between Taskmanagers, but "as soon
> as
> > > possible" is vague anyway, which is why I think that's ok.
> > >
> > > about naming: The naming with "PatternProcessor" sounds good to me.
> Final
> > > nit: I would go for CEP#patternProccessors, which would be consistent
> with
> > > CEP#pattern.
> > >
> > > I am not sure about one of the rejected alternatives:
> > >
> > > > Have each subtask of an operator make the update on their own
> > >
> > >    -
> > >
> > >    It is hard to achieve consistency.
> > >    -
> > >
> > >       Though the time interval that each subtask makes the update can
> be
> > >       the same, the absolute time they make the update might be
> different.
> > > For
> > >       example, one makes updates at 10:00, 10:05, etc, while another
> does
> > > it at
> > >       10:01, 10:06. In this case the subtasks might never processing
> data
> > > with
> > >       the same set of pattern processors.
> > >
> > >
> > > I would have thought that it is quite easy to poll for the rules from
> each
> > > Subtask at *about *the same time. So, this alone does not seem to be
> > > enough to rule out this option. I've looped in David Moravek to get his
> > > opinion of the additional load imposed on the JM.
> > >
> > > Thanks,
> > >
> > > Konstantin
> > >
> > > On Mon, Dec 20, 2021 at 4:06 AM Nicholas Jiang <
> nicholasji...@apache.org>
> > > wrote:
> > >
> > > > Hi Yue,
> > > >
> > > > Thanks for your feedback of the FLIP. I have addressed your
> questions and
> > > > made a corresponding explanation as follows:
> > > >
> > > > -- About Pattern Updating. If we use PatternProcessoerDiscoverer to
> > > update
> > > > the rules, will it increase the load of JM? For example, if the user
> > > wants
> > > > the updated rule to take effect immediately,, which means that we
> need to
> > > > set a shorter check interval or there is another scenario when users
> > > rarely
> > > > update the pattern, will the PatternProcessoerDiscoverer be in most
> of
> > > the
> > > > time Do useless checks ? Will a lazy update mode could be used,
> which the
> > > > pattern only be updated when triggered by the user, and do nothing at
> > > other
> > > > times?
> > > >
> > > > PatternProcessoerDiscoverer is a user-defined interface to discover
> the
> > > > PatternProcessor updates. Periodically checking the PatternProcessor
> in
> > > the
> > > > database is a implementation of the PatternProcessoerDiscoverer
> > > interface,
> > > > which is that periodically querys all the PatternProcessor table in
> > > certain
> > > > interval. This implementation indeeds has the useless checks, and
> could
> > > > directly integrates the changelog of the table. In addition, in
> addition
> > > to
> > > > the implementation of periodically checking the database, there are
> other
> > > > implementations such as the PatternProcessor that provides Restful
> > > services
> > > > to receive updates.
> > > >
> > > > --  I still have some confusion about how Key Generating Opertator
> and
> > > > CepOperator (Pattern Matching & Processing Operator) work together.
> If
> > > > there are N PatternProcessors, will the Key Generating Opertator
> > > generate N
> > > > keyedStreams, and then N CepOperator would process each Key
> separately ?
> > > Or
> > > > every CepOperator Task would process all patterns, if so, does the
> key
> > > type
> > > > in each PatternProcessor need to be the same?
> > > >
> > > > Firstly the Pattern Matching & Processing Operator is not the
> CepOperator
> > > > at present, because CepOperator mechanism is based on the NFAState.
> > > > Secondly if there are N PatternProcessors, the Key Generating
> Opertator
> > > > combines all the keyedStreams with keyBy() operation, thus the
> Pattern
> > > > Matching & Processing Operator would process all the patterns. In
> other
> > > > words, the KeySelector of the PatternProcessor is used for the Key
> > > > Generating Opertator, and the Pattern and PatternProceessFunction of
> the
> > > > PatternProcessor are used for the Pattern Matching & Processing
> Operator.
> > > > Lastly the key type in each PatternProcessor is the same, regarded as
> > > > Object type.
> > > >
> > > > -- Maybe need to pay attention to it when implementing it .If some
> > > Pattern
> > > > has been removed or updated, will the partially matched results in
> > > > StateBackend would be clean up or We rely on state ttl to clean up
> these
> > > > expired states.
> > > >
> > > > If certain Pattern has been removed or updated, the partially matched
> > > > results in StateBackend would be clean up until the next checkpoint.
> The
> > > > partially matched result doesn't depend on the state ttl of the
> > > > StateBackend.
> > > >
> > > > 4. Will the PatternProcessorManager keep all the active
> PatternProcessor
> > > > in memory? We have also Support Multiple Rule and Dynamic Rule
> Changing.
> > > > But we are facing such a problem, some users’ usage scenarios are
> that
> > > they
> > > > want to have their own pattern for each user_id, which means that
> there
> > > > could be thousands of patterns, which would make the performance of
> > > Pattern
> > > > Matching very poor. We are also trying to solve this problem.
> > > >
> > > > The PatternProcessorManager keeps all the active PatternProcessor in
> > > > memory. For scenarios that they want to have their own pattern for
> each
> > > > user_id, IMO, is it possible to reduce the fine-grained pattern of
> > > > PatternProcessor to solve the performance problem of the Pattern
> > > Matching,
> > > > for example, a pattern corresponds to a group of users? The scenarios
> > > > mentioned above need to be solved by case by case.
> > > >
> > > > Best,
> > > > Nicholas Jiang
> > > >
> > > > On 2021/12/17 11:43:10 yue ma wrote:
> > > > > Glad to see the Community's progress in Flink CEP. After reading
> this
> > > > Flip,
> > > > > I have few questions, would you please take a look  ?
> > > > >
> > > > > 1. About Pattern Updating. If we use PatternProcessoerDiscoverer to
> > > > update
> > > > > the rules, will it increase the load of JM? For example, if the
> user
> > > > wants
> > > > > the updated rule to take effect immediately,, which means that we
> need
> > > to
> > > > > set a shorter check interval  or there is another scenario when
> users
> > > > > rarely update the pattern, will the PatternProcessoerDiscoverer be
> in
> > > > most
> > > > > of the time Do useless checks ? Will a lazy update mode could be
> used,
> > > > > which the pattern only be updated when triggered by the user, and
> do
> > > > > nothing at other times ?
> > > > >
> > > > > 2.   I still have some confusion about how Key Generating
> Opertator and
> > > > > CepOperator (Pattern Matching & Processing Operator) work
> together. If
> > > > > there are N PatternProcessors, will the Key Generating Opertator
> > > > generate N
> > > > > keyedStreams, and then N CepOperator would process each Key
> separately
> > > ?
> > > > Or
> > > > > every CepOperator Task would process all patterns, if so, does the
> key
> > > > type
> > > > > in each PatternProcessor need to be the same ?
> > > > >
> > > > > 3. Maybe need to pay attention to it when implementing it .If some
> > > > Pattern
> > > > > has been removed or updateed  ,will the partially matched results
> in
> > > > > StateBackend would be clean up or We rely on state ttl to clean up
> > > these
> > > > > expired states.
> > > > >
> > > > > 4. Will the PatternProcessorManager keep all the active
> > > PatternProcessor
> > > > in
> > > > > memory ? We have also Support Multiple Rule and Dynamic Rule
> Changing .
> > > > > But we are facing such a problem, some users’ usage scenarios are
> that
> > > > they
> > > > > want to have their own pattern for each user_id, which means that
> there
> > > > > could be thousands of patterns, which would make the performance of
> > > > Pattern
> > > > > Matching very poor. We are also trying to solve this problem.
> > > > >
> > > > > Yunfeng Zhou <flink.zhouyunf...@gmail.com> 于2021年12月10日周五 19:16写道:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I'm opening this thread to propose the design to support multiple
> > > rule
> > > > &
> > > > > > dynamic rule changing in the Flink-CEP project, as described in
> > > > FLIP-200
> > > > > > [1]
> > > > > > .
> > > > > >
> > > > > > Currently Flink CEP only supports having a single pattern inside
> a
> > > > > > CepOperator and does not support changing the pattern
> dynamically. In
> > > > order
> > > > > > to reduce resource consumption and to experience shorter downtime
> > > > during
> > > > > > pattern updates, there is a growing need in the production
> > > environment
> > > > that
> > > > > > expects CEP to support having multiple patterns in one operator
> and
> > > to
> > > > > > support dynamically changing them. Therefore I propose to add
> certain
> > > > > > infrastructure as described in FLIP-200 to support these
> > > > functionalities.
> > > > > >
> > > > > > Please feel free to reply to this email thread. Looking forward
> to
> > > your
> > > > > > feedback!
> > > > > >
> > > > > > [1]
> > > > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=195730308
> > > > > >
> > > > > > Best regards,
> > > > > >
> > > > > > Yunfeng
> > > > > >
> > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable
> > >
> > > https://github.com/knaufk
> > >
> >
>

Reply via email to