thanks a lot Juergen On Mon, Feb 17, 2020 at 11:08 AM Kostas Kloudas <kklou...@gmail.com> wrote:
> Hi Juergen, > > I will reply to your questions inline. As a general comment I would > suggest to also have a look at [3] so that you have an idea of some of > the alternatives. > With that said, here come the answers :) > > 1) We receive files every day, which are exports from some database > tables, containing ONLY changes from the day. Most tables have > modify-cols. Even though they are files but because they contain > changes only, I belief the file records shall be considered events in > Flink terminology. Is that assumption correct? > > -> Yes. I think your assumption is correct. > > 2) The records within the DB export files are NOT in chronologically, > and we can not change the export. Our use case is a "complex event > processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first > A, then B, then C within 30 days, then do something". Does that work > with FlinkCEP despite the events/records are not in chrono order > within the file? The files are 100MB to 20GB in size. Do I need to > sort the files first before CEP processing? > > -> Flink CEP also works in event time and the re-ordering can be done by > Flink > > 3) Occassionally some crazy people manually "correct" DB records > within the database and manually trigger a re-export of ALL of the > changes for that respective day (e.g. last weeks Tuesday). > Consequently we receive a correction file. Same filename but "_1" > appended. All filenames include the date (of the original export). > What are the options to handle that case (besides telling the DB > admins not to, which we did already). Regular checkpoints and > re-process all files since then? What happens to the CEP state? Will > it be checkpointed as well? > > -> If you require re-processing, then I would say that your best > option is what you described. The other option would be to keep > everything in Flink state until you are sure that no more corrections > will come. In this case, you have to somehow issue the "correction" in > a way that the downstream system can understand what to correct and > how. Keep in mind that this may be an expensive operation because > everything has to be kept in state for longer. > > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? > > -> The only thing to consider is the size of your state. Time is not > necessarily an issue. If your state for these 180 days is a couple of > MBs, then you have no problem. If it increases fast, then you have to > provision your cluster accordingly. > > 5) We also have CEP rules that must fire if after a start sequence > matched, the remaining sequence did NOT within a configured window. > E.g. If A, then B, but C did not occur within 30 days since A. Is that > supported by FlinkCEP? I couldn't find a working example. > > -> You can have a look at [1] for the supported pattern combinations > and you can also look at [2] for some tests of different pattern > combinations. > > 6) We expect 30-40 CEP rules. How can we estimate the required storage > size for the temporary CEP state? Is there some sort of formular > considering number of rules, number of records per file or day, record > size, window, number of records matched per sequence, number of keyBy > grouping keys, ... > > -> In FlinkCEP, each pattern becomes a single operator. This means > that you will have 30-40 operators in your job graph, each with each > own state. This can become heavy but once again it depends on your > workload. I cannot give an estimate because in CEP, in order to > guarantee correct ordering of events in an unordered stream, the > library sometimes has to keep also in state more records than will be > presented at the end. > > Have you considered going with a solution based on processfunction and > broadcast state? This will also allow you to have a more dynamic > set-up where patterns can be added at runtime and it will allow you to > do any optimizations specific to your workload ;) For a discussion on > this, check [3]. In addition, it will allow you to "multiplex" many > patterns into a single operator thus potentially minimizing the amount > of copies of the state you keep. > > 7) I can imagine that for debugging reasons it'd be good if we were > able to query the temporary CEP state. What is the (CEP) schema used > to persist the CEP state and how can we query it? And does such query > work on the whole cluster or only per node (e.g. because of shuffle > and nodes responsible only for a portion of the events). > > -> Unfortunatelly the state in CEP is not queryable, thus I am not > sure if you can inspect it at runtime. > > 8) I understand state is stored per node. What happens if I want to > add or remove a nodes. Will the state still be found, despite it being > stored in another node? I read that I need to be equally careful when > changing rules? Or is that a different issue? > > -> Rescaling a Flink job is not done automatically. You need to take a > savepoint and then relaunch your job with a different parallelism. > Updating a rule is not supported in CEP, as changing a rule would > imply that (potentially) the state should change. But what you could > do is take a savepoint, remove the old pattern and add a new one (the > updated one) and tell Flink to ignore the state of the previous > operator (as said earlier each CEP pattern is translated to an > operator). > > 9) How does garbage collection of temp CEP state work, or will it stay > forever? For tracing/investigation reasons I can imagine that purging > it at the earliest possible time is not always the best option. May be > after 30 days later or so. > > -> CEP clean state after the time horizon (specified with the > .within() clause) expires. > > 10) Are there strategies to minimize temp CEP state? In SQL queries > you filter first on the "smallest" attributes. CEP rules form a > sequence. Hence that approach will not work. Is that an issue at all? > What are practical limits on the CEP temp state storage engine? > > -> Such optimizations are not supported out of the box. I would > recommend to go with the Broadcast state approach in [3]. > > 11) Occassionally we need to process about 200 files at once. Can I > speed things up by processing all files in parallel on multiple nodes, > despite their sequence (CEP use case)? This would only work if > FlinkCEP in step 1 simply filters on all relevant events of a > sequence, updates state, and in a step 2 - after the files are > processed - evaluates the updated state if that meets the sequences. > > 12) Schema changes in the input files: Occassionly the DB source > system schema is changed, and not always in a backwards compatible way > (insert new fields in the middle), and also the export will have the > field in the middle. This means that starting from a specific (file) > date, I need to consider a different schema. This must also be handled > when re-running files for the last month, because of corrections > provided. And if the file format has changed someone in the middle ... > > -> This seems to be relevant for the "data cleaning" phase, before you > send your data to CEP. In this case, if the schema changes, then I > assume that you need to update your initial parsing logic, which means > taking a savepoint and redeploying the updated jobGraph with the new > input parsing logic (if I understand correctly). > > thanks a lot for your time and your help > > I hope that above helps! > > Cheers, > Kostas > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html#combining-patterns > [2] > https://github.com/apache/flink/blob/master/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java > [3] https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html > > On Mon, Feb 10, 2020 at 6:35 PM Juergen Donnerstag > <juergen.donners...@gmail.com> wrote: > > > > Hi, > > > > we're in very early stages evaluating options. I'm not a Flink expert, > but did read some of the docs and watched videos. Could you please help me > understand if and how certain of our reqs are covered by Flink (CEP). Is > this mailing list the right channel for such questions? > > > > 1) We receive files every day, which are exports from some database > tables, containing ONLY changes from the day. Most tables have modify-cols. > Even though they are files but because they contain changes only, I belief > the file records shall be considered events in Flink terminology. Is that > assumption correct? > > > > 2) The records within the DB export files are NOT in chronologically, > and we can not change the export. Our use case is a "complex event > processing" case (FlinkCEP) with rules like "KeyBy(someKey) If first A, > then B, then C within 30 days, then do something". Does that work with > FlinkCEP despite the events/records are not in chrono order within the > file? The files are 100MB to 20GB in size. Do I need to sort the files > first before CEP processing? > > > > 3) Occassionally some crazy people manually "correct" DB records within > the database and manually trigger a re-export of ALL of the changes for > that respective day (e.g. last weeks Tuesday). Consequently we receive a > correction file. Same filename but "_1" appended. All filenames include the > date (of the original export). What are the options to handle that case > (besides telling the DB admins not to, which we did already). Regular > checkpoints and re-process all files since then? What happens to the CEP > state? Will it be checkpointed as well? > > > > 4) Our CEP rules span upto 180 days (resp. 6 months). Is that a problem? > > > > 5) We also have CEP rules that must fire if after a start sequence > matched, the remaining sequence did NOT within a configured window. E.g. If > A, then B, but C did not occur within 30 days since A. Is that supported by > FlinkCEP? I couldn't find a working example. > > > > 6) We expect 30-40 CEP rules. How can we estimate the required storage > size for the temporary CEP state? Is there some sort of formular > considering number of rules, number of records per file or day, record > size, window, number of records matched per sequence, number of keyBy > grouping keys, ... > > > > 7) I can imagine that for debugging reasons it'd be good if we were able > to query the temporary CEP state. What is the (CEP) schema used to persist > the CEP state and how can we query it? And does such query work on the > whole cluster or only per node (e.g. because of shuffle and nodes > responsible only for a portion of the events). > > > > 8) I understand state is stored per node. What happens if I want to add > or remove a nodes. Will the state still be found, despite it being stored > in another node? I read that I need to be equally careful when changing > rules? Or is that a different issue? > > > > 9) How does garbage collection of temp CEP state work, or will it stay > forever? For tracing/investigation reasons I can imagine that purging it > at the earliest possible time is not always the best option. May be after > 30 days later or so. > > > > 10) Are there strategies to minimize temp CEP state? In SQL queries you > filter first on the "smallest" attributes. CEP rules form a sequence. Hence > that approach will not work. Is that an issue at all? What are practical > limits on the CEP temp state storage engine? > > > > 11) Occassionally we need to process about 200 files at once. Can I > speed things up by processing all files in parallel on multiple nodes, > despite their sequence (CEP use case)? This would only work if FlinkCEP in > step 1 simply filters on all relevant events of a sequence, updates state, > and in a step 2 - after the files are processed - evaluates the updated > state if that meets the sequences. > > > > 12) Schema changes in the input files: Occassionly the DB source system > schema is changed, and not always in a backwards compatible way (insert new > fields in the middle), and also the export will have the field in the > middle. This means that starting from a specific (file) date, I need to > consider a different schema. This must also be handled when re-running > files for the last month, because of corrections provided. And if the file > format has changed someone in the middle ... > > > > thanks a lot for your time and your help > > Juergen >