Hi Martijin,

Thanks for your feedback about my email reply. I left comments for all the 
points you mentioned:

-- Since this FLIP addresses the most watched Jira ticket for Flink 
(https://issues.apache.org/jira/browse/FLINK-7129), I'm assuming this will be 
used a lot. Does this also mean that you would like to take ownership of the 
CEP library as a whole?

About the Jira ticket for Flink 
(https://issues.apache.org/jira/browse/FLINK-7129), if Dawid Wysakowicz has no 
time to work on this ticket, the ticket could be assigned to Yunfeng Zhou or 
me. We could drive the ticket and push the implementation of this FLIP.

-- If we want to support multiple rule and dynamic rule changing for use cases 
in domains like risk controlling or fraud, I do think we need to have a good 
look at eventual consistency. What do we do in situations where the Operator 
Coordinator can't access the database? I could imagine that it makes sense to 
make it configurable how often the database will be queried for new or updated 
rules or how many retries the Operator Coordinator will take before failing the 
job.

In the baseline implemenation, how often the database will be queried for new 
is configurable for the rule(pattern processor) discoverer, only this point 
doesn't mention in this FLIP. 

-- A similar concern is what I have if for whatever reason the different 
taskmanagers can't get the latest rules, so some taskmanagers might run on the 
latest rule changes while some might use older versions. Those type of issues 
can be quite hard to debug. Do we want to introduce the config option to fail a 
job in case a taskmanager doesn't get the latest rules?

Good point. If for whatever reason the different taskmanagers can't get the 
latest rule, the Operator Coordinator could send a heartbeat to all 
taskmanagers with the latest rules and check the heartbeat response from all 
the taskmanagers whether the latest rules of the taskmanager is equal to these 
of the Operator Coordinator.

--  Do we need to take certain guarantees (like at least once) in account for 
this setup and/or document these? What happens in the situation where the 
cluster crashes and has to recover from a savepoint of let's say 3 hours ago, 
but the rules in the database have changed 2 hours ago. That means for the 
events that are processed again after 2 hours, the output can be different 
because the rules have changed.

The certain guarantees is needed in the whole mechanism. We would like to 
document the behavior for all failover and crash cases to tell users with the 
process logic. For the failover in this FLIP, we will add the detailed 
explanation to show the process logic for failover and crash cases.

-- In my previous job, we've created a similar system like this. The 
differences there were that we didn't use the jobmanager to send the results to 
the taskmanagers, but the taskmanagers queried the database in periodic 
intervals. Each taskmanager retrieved all the rules that were applicable and 
cached them. Is this also something that you considered?

We have consided about the solution mentioned above. In this solution, I have 
some questions about how to guarantee the consistency of the rule between each 
TaskManager. By having a coodinator in the JobManager to centrally manage the 
latest rules, the latest rules of all TaskManagers are consistent with those of 
the JobManager, so as to avoid the inconsistencies that may be encountered in 
the above solution. Can you introduce how this solution guarantees the 
consistency of the rules?

About setting a timestamp for when a rule should be active, in the latest FLIP, 
PatternProcessor interface adds the `getTimestamp` method to support the 
ability for the event time or the processing time.

In summary, the current design is that JobManager tells all TaskManagers the 
latest rules through OperatorCoodinator, and will initiate a heartbeat to check 
whether the latest rules on each TaskManager are consistent. We will describe 
how to deal with the Failover scenario in more detail on FLIP.

Best,
Nicholas Jiang

On 2021/12/14 13:06:16 Martijn Visser wrote:
> Hi all,
> 
> > IMO, in this FLIP, we only need to introduce the general design of the
> Table API/SQL level. As for the design details, you can create a new FLIP.
> 
> Do you think that the current section on Table/SQL API support is
> sufficient as a general design?
> 
> > And do we need to take into account the support for Batch mode if you
> expand the MATCH_RECOGNIZE function?
> 
> Yes, I do think so since adding support for Batch mode for MATCH_RECOGNIZE
> is envisioned for Flink 1.15, but is also in danger of not making it.
> Relevant ticket number is https://issues.apache.org/jira/browse/FLINK-24865
> 
> With regards to the content of the FLIP, I have a couple of questions or
> concerns:
> 
> * Since this FLIP addresses the most watched Jira ticket for Flink (
> https://issues.apache.org/jira/browse/FLINK-7129), I'm assuming this will
> be used a lot. Does this also mean that you would like to take ownership of
> the CEP library as a whole?
> 
> * If we want to support multiple rule and dynamic rule changing for use
> cases in domains like risk controlling or fraud, I do think we need to have
> a good look at eventual consistency. What do we do in situations where the
> Operator Coordinator can't access the database? I could imagine that it
> makes sense to make it configurable how often the database will be queried
> for new or updated rules or how many retries the Operator Coordinator will
> take before failing the job.
> 
> * A similar concern is what I have if for whatever reason the different
> taskmanagers can't get the latest rules, so some taskmanagers might run on
> the latest rule changes while some might use older versions. Those type of
> issues can be quite hard to debug. Do we want to introduce the config
> option to fail a job in case a taskmanager doesn't get the latest rules?
> 
> * Do we need to take certain guarantees (like at least once) in account for
> this setup and/or document these? What happens in the situation where the
> cluster crashes and has to recover from a savepoint of let's say 3 hours
> ago, but the rules in the database have changed 2 hours ago. That means for
> the events that are processed again after 2 hours, the output can be
> different because the rules have changed.
> 
> In my previous job, we've created a similar system like this. The
> differences there were that we didn't use the jobmanager to send the
> results to the taskmanagers, but the taskmanagers queried the database in
> periodic intervals. Each taskmanager retrieved all the rules that were
> applicable and cached them. Is this also something that you considered?
> 
> I think it's indeed important to also support the ability to set a
> timestamp for when a rule should be active. We should consider if we only
> want to make this available for eventtime or also for processing time. I
> can imagine that marketing rules which are used to determine if someone is
> eligible for a discount only during GMT time during the weekend could be
> interested in processing time capabilities.
> 
> Looking forward to the FLIP :)
> 
> Best regards,
> 
> Martijn
> 
> On Tue, 14 Dec 2021 at 10:00, Nicholas Jiang <nicholasji...@apache.org>
> wrote:
> 
> > Hi Konstantin,
> >
> >     Thanks for your detailed explanation for DynamicPattern[Holder]
> > renaming. I have another idea for this renaming, what about renaming the
> > Rule to PatternProcessor? The CEP means that complex event processing, thus
> > the name PatternProcessor corresponds to the concept of CEP.
> > A PatternProcessor contains the specific pattern and how to process the
> > pattern, and this contains the dynamic meaning. What's more, CEP.rule()
> > method could be renamed to CEP.patternProcess(). WDYT?
> >
> > Best,
> > Nicholas Jiang
> >
> > On 2021/12/14 07:32:46 Konstantin Knauf wrote:
> > > Hi Nicholas,
> > >
> > > I understand that a Rule contains more than the Pattern. Still, I favor
> > > DynamicPattern[Holder] over Rule, because the term "Rule" does not exist
> > in
> > > Flink's CEP implementation so far and "dynamic" seems to be the important
> > > bit here.
> > >
> > > Cheers,
> > >
> > > Konstantin
> > >
> > > On Tue, Dec 14, 2021 at 4:46 AM Nicholas Jiang <nicholasji...@apache.org
> > >
> > > wrote:
> > >
> > > > Hi DianFu,
> > > >
> > > >      Thanks for your feedback of the FLIP.
> > > >
> > > >      About the mentioned question for the `getLatestRules`, IMO, this
> > > > doesn't need to rename into `getRuleChanges` because this method is
> > used
> > > > for getting the total amount of the latest rules which has been updated
> > > > once.
> > > >
> > > >      About the CEP.rule method, the CEP.dynamicPattern renaming is
> > > > confusing for users. The dynamic pattern only creates the
> > PatternStream not
> > > > the DataStream. From the concept, a dynamic pattern is also a pattern,
> > not
> > > > contains the PatternProcessFunction. If renaming the CEP.rule into
> > > > CEP.dynamicPattern, the return value of the method couldn't include the
> > > > PatternProcessFunction, only returns the PatternStream. I think the
> > > > difference between the Rule and the Pattern is that Rule contains the
> > > > PatternProcessFunction, but the Pattern or DynamicPattern doesn't
> > contain
> > > > the function.
> > > >
> > > > Best
> > > > Nicholas Jiang
> > > >
> > >
> > >
> > > --
> > >
> > > Konstantin Knauf
> > >
> > > https://twitter.com/snntrable
> > >
> > > https://github.com/knaufk
> > >
> >
> 

Reply via email to