Hi, Thanks for the answers.
>> Are you proposing that all of the inputs to stateful operators would have to be sorted? >> > Records in stream don't need to be sorted, but it should be managed by `Timestamp Barrier`, which means > 1. Records belonging to a specific `Timestamp Barrier` are disordered. > 2. Computations in different timestamp barriers are ordered. For the above > example, each stateful subtask can start computation for T2 only after it > finishes computation for T1. Subtasks are independent of each other. Wouldn't that add significant latency to processing the records? You would basically introduce a batch processing concept in Flink? Have you considered some alternative solutions? Like for example letting each operator/function/sink to take care of the data disorder? For example: - stateless operators, could completely ignore the issue and process the records normally, as they are doing right now - stateful operators, should either: - if the business doesn't require ordering, they could process the records immediately - or buffer the records internally, like currently windowed/temporal operators are doing. Non windowed joins/aggregations could also work in a similar manner, like pre-aggregate data per each "epoch" (as demarcated by timestamp barriers). - sinks implementation would have to match what external system support: - if the external system requires ordered writes (something like Kafka topic?), the sinks would have to buffer the writes until a "timestamp barrier" arrives - some sinks might support writing the data simultaneously to different "epochs". For example writing files bucketed by each epoch. Each bucket/epoch could be committed independently This way, latency would be behaving very much like it currently does in Flink. For example if we have a following streaming SQL: INSERT INTO alerts_with_user SELECT * FROM alerts a, users u WHERE a.user_id = u.id If there is some lag in the users table, alerts would be still generated. Downstream applications could process and react to newly generated `alerts_with_user`, while at the same time, we could have a consistent view across those three tables (users, alerts, alerts_with_user) if needed. > I call the data of the timetamp barrier "committed" if the data > is written to a table according to the barrier without a snapshot, and the > data may be "rolled back" due to job failure. (sorry that the "committed" > here may not be appropriate) Ok, I get it now. Indeed the terminology is confusing. Maybe we shouldn't say that the timestamp barrier has been committed, but that all records for given "epoch" have been processed/written, but not yet committed, so they can still be rolled-back? > For example, when multiple jobs start at the same time and register themselves in `MetaService`, > it needs to serially check whether they write to the same table Why do we need to do that? Only to disallow this? To forbid writing from two jobs into a single table? If so, can we not push this responsibility down to the connector? Like sink/source operator coordinators should negotiate with respective external systems if the given read/write is allowed? So if there is a need for such meta service, Flink doesn't need to know about it? Best, Piotrek pon., 6 lut 2023 o 10:44 Shammon FY <zjur...@gmail.com> napisał(a): > Hi Piotr, > > Thanks for your feedback. In general, I think `Timesamp Barrier` is a > special `Watermark` that all sources send watermarks with the same > timestamp as `Timestamp Barrier` and aggregation operators will align data > by it. For example, all source subtasks are assigned two unified watermarks > T1 and T2, T1 < T2. All records with timestamp <= T1 will be aligned by T1, > and records with timestamp (T1, T2] will be aligned by T2. > > > Are you proposing that all of the inputs to stateful operators would have > to be sorted? > > Records in stream don't need to be sorted, but it should be managed by > `Timestamp Barrier`, which means > 1. Records belonging to a specific `Timestamp Barrier` are disordered. > 2. Computations in different timestamp barriers are ordered. For the above > example, each stateful subtask can start computation for T2 only after it > finishes computation for T1. Subtasks are independent of each other. > > > Can you explain why do you need those 3 states? Why can committed records > be rolled back? > > Here I try to define the states of data in tables according to Timestamp > Barrier and Snapshot, and I found that the 3 states are incomplete. For > example, there is timestamp barrier T associated with checkpoint P, and > sink operator will create snapshot S for P in tables. The data states in > tables are as follows > 1. Sink finishes writing data of timestamp barrier T to a table, but > snapshot P is not created in the table and T is not finished in all tables. > 2. Sink finishes writing data of timestamp barrier T to a table, creates > snapshot P according to checkpoint C, but the T1 is not finished in all > tables. > 3. Timestamp barrier T is finished in all tables, but snapshot P is not > created in all tables. > 4. Timestamp barrier T is finished in all tables, and snapshot P is created > in all tables too. > > Currently users can only get data from snapshots in Table Store and other > storages such as Iceberg. Users can get different "versioned" data from > tables according to their data freshness and consistency requirements. > I think we should support getting data with a timestamp barrier even before > the sink operator finishes creating the snapshot in the future. In this > situation, I call the data of the timetamp barrier "committed" if the data > is written to a table according to the barrier without a snapshot, and the > data may be "rolled back" due to job failure. (sorry that the "committed" > here may not be appropriate) > > > I'm not sure if I follow. Generally speaking, why do we need MetaService > at all? Why can we only support writes to and reads from TableStore, and > not any source/sink that implements some specific interface? > > It's a good point. I added a `MetaService` node in FLIP mainly to perform > some atomic operations. For example, when multiple jobs start at the same > time and register themselves in `MetaService`, it needs to serially check > whether they write to the same table. If we do not use an > independent `MetaService Node`, we may need to introduce some other "atomic > dependency" such as ZooKeeper. But removing `MetaService Node` can make the > system more flexible, I think it's also valuable. Maybe we can carefully > design MetaService API and support different deployment modes in the next > FLIP? WDYT? > > > Best, > Shammon > > > On Fri, Feb 3, 2023 at 10:43 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > > > Hi Shammon, > > > > Thanks for pushing the topic further. I'm not sure how this new proposal > is > > supposed to be working? How should timestamp barrier interplay with event > > time and watermarks? Or is timestamp barrier supposed to completely > replace > > watermarks? > > > > > stateful and temporal operators should align them (records) according > to > > their timestamp field. > > > > Are you proposing that all of the inputs to stateful operators would have > > to be sorted? > > > > > There're three states in a table for specific transaction : PreCommit, > > Commit and Snapshot > > > > Can you explain why do you need those 3 states? Why can committed records > > be rolled back? > > > > >> 10. Have you considered proposing a general consistency mechanism > > instead > > >> of restricting it to TableStore+ETL graphs? For example, it seems to > me > > to > > >> be possible and valuable to define instead the contract that > > sources/sinks > > >> need to implement in order to participate in globally consistent > > snapshots. > > > > > > A general consistency mechanism is cool! In my mind, the overall > > > `consistency system` consists of three components: Streaming & Batch > ETL, > > > Streaming & Batch Storage and MetaService. MetaService is decoupled > from > > > Storage Layer, but it stores consistency information in persistent > > storage. > > > It can be started as an independent node or a component in a large > Flink > > > cluster. In the FLIP we use TableStore as the Storage Layer. As you > > > mentioned, we plan to implement specific source and sink on the > > TableStore > > > in the first phase, and may consider other storage in the future > > > > I'm not sure if I follow. Generally speaking, why do we need MetaService > at > > all? Why can we only support writes to and reads from TableStore, and not > > any source/sink that implements some specific interface? > > > > Best, > > Piotrek > > > > niedz., 29 sty 2023 o 12:11 Shammon FY <zjur...@gmail.com> napisał(a): > > > > > Hi @Vicky > > > > > > Thank you for your suggestions about consistency and they're very nice > to > > > me! > > > > > > I have updated the examples and consistency types[1] in FLIP. In > > general, I > > > regard the Timestamp Barrier processing as a transaction and divide the > > > data consistency supported in FLIP into three types > > > > > > 1. Read Uncommitted: Read data from tables even when a transaction is > not > > > committed. > > > 2. Read Committed: Read data from tables according to the committed > > > transaction. > > > 3. Repeatable Read: Read data from tables according to the committed > > > transaction in snapshots. > > > > > > You can get more information from the updated FLIP. Looking forward to > > your > > > feedback, THX > > > > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store#FLIP276:DataConsistencyofStreamingandBatchETLinFlinkandTableStore-DataConsistencyType > > > > > > Best, > > > Shammon > > > > > > > > > On Sat, Jan 28, 2023 at 4:42 AM Vasiliki Papavasileiou > > > <vpapavasile...@confluent.io.invalid> wrote: > > > > > > > Hi Shammon, > > > > > > > > > > > > Thank you for opening this FLIP which is very interesting and such an > > > > important feature to add to the Flink ecosystem. I have a couple of > > > > suggestions/questions: > > > > > > > > > > > > > > > > - > > > > > > > > Consistency is a very broad term with different meanings. There > are > > > many > > > > variations between the two extremes of weak and strong consistency > > > that > > > > tradeoff latency for consistency. https://jepsen.io/consistency > It > > > > would > > > > be great if we could devise an approach that allows the user to > > choose > > > > which consistency level they want to use for a query. > > > > > > > > > > > > Example: In your figure where you have a DAG, assume a user queries > > only > > > > Table1 for a specific key. Then, a failure happens and the table > > restores > > > > from a checkpoint. The user issues the same query, looking up the > same > > > key. > > > > What value does she see? With monotonic-reads, the system guarantees > > that > > > > she will only see the same or newer values but not older, hence will > > not > > > > experience time-travel. This is a very useful property for a system > to > > > have > > > > albeit it is at the weaker-end of consistency guarantees. But it is a > > > good > > > > stepping stone. > > > > > > > > > > > > Another example, assume the user queries Table1 for key K1 and gets > the > > > > value V11. Then, she queries Table2 that is derived from Table1 for > the > > > > same key, K1, that returns value V21. What is the relationship > between > > > V21 > > > > and V11? Is V21 derived from V11 or can it be an older value V1 (the > > > > previous value of K1)? What if value V21 is not yet in table Table2? > > What > > > > should she see when she queries Table1? Should she see the key V11 or > > > not? > > > > Should the requirement be that a record is not visible in any of the > > > tables > > > > in a DAG unless it is available in all of them? > > > > > > > > > > > > > > > > - > > > > > > > > It would we good to have a set of examples with consistency > > anomalies > > > > that can happen (like the examples above) and what consistency > > levels > > > we > > > > want the system to offer to prevent them. > > > > Moreover, for each such example, it would be good to have a > > > description > > > > of how the approach (Timestamp Barriers) will work in practice to > > > > prevent > > > > such anomalies. > > > > > > > > > > > > Thank you, > > > > Vicky > > > > > > > > > > > > On Fri, Jan 27, 2023 at 4:46 PM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > Hello Shammon and all, > > > > > > > > > > Thanks for this FLIP! I've been working toward this kind of global > > > > > consistency across large scale data infrastructure for a long time, > > and > > > > > it's fantastic to see a high-profile effort like this come into > play. > > > > > > > > > > I have been lurking in the discussion for a while and delaying my > > > > response > > > > > while I collected my thoughts. However, I've realized at some > point, > > > > > delaying more is not as useful as just asking a few questions, so > I'm > > > > sorry > > > > > if some of this seems beside the point. I'll number these to not > > > collide > > > > > with prior discussion points: > > > > > > > > > > 10. Have you considered proposing a general consistency mechanism > > > instead > > > > > of restricting it to TableStore+ETL graphs? For example, it seems > to > > me > > > > to > > > > > be possible and valuable to define instead the contract that > > > > sources/sinks > > > > > need to implement in order to participate in globally consistent > > > > snapshots. > > > > > > > > > > 11. It seems like this design is assuming that the "ETL Topology" > > under > > > > > the envelope of the consistency model is a well-ordered set of > jobs, > > > but > > > > I > > > > > suspect this is not the case for many organizations. It may be > > > > > aspirational, but I think the gold-standard here would be to > provide > > an > > > > > entire organization with a consistency model spanning a loosely > > coupled > > > > > ecosystem of jobs and data flows spanning teams and systems that > are > > > > > organizationally far apart. > > > > > > > > > > I realize that may be kind of abstract. Here's some examples of > > what's > > > on > > > > > my mind here: > > > > > > > > > > 11a. Engineering may operate one Flink cluster, and some other org, > > > like > > > > > Finance may operate another. In most cases, those are separate > > domains > > > > that > > > > > don't typically get mixed together in jobs, but some people, like > the > > > > CEO, > > > > > would still benefit from being able to make a consistent query that > > > spans > > > > > arbitrary contexts within the business. How well can a feature like > > > this > > > > > transcend a single Flink infrastructure? Does it make sense to > > > consider a > > > > > model in which snapshots from different domains can be composable? > > > > > > > > > > 11b. Some groups may have a relatively stable set of long-running > > jobs, > > > > > while others (like data science, skunkworks, etc) may adopt a more > > > > > experimental, iterative approach with lots of jobs entering and > > exiting > > > > the > > > > > ecosystem over time. It's still valuable to have them participate > in > > > the > > > > > consistency model, but it seems like the consistency system will > have > > > to > > > > > deal with more chaos than I see in the design. For example, how can > > > this > > > > > feature tolerate things like zombie jobs (which are registered in > the > > > > > system, but fail to check in for a long time, and then come back > > > later). > > > > > > > > > > 12. I didn't see any statements about patterns like cycles in the > ETL > > > > > Topology. I'm aware that there are fundamental constraints on how > > well > > > > > cyclic topologies can be supported by a distributed snapshot > > algorithm. > > > > > However, there are a range of approaches/compromises that we can > > apply > > > to > > > > > cyclic topologies. At the very least, we can state that we will > > detect > > > > > cycles and produce a warning, etc. > > > > > > > > > > 13. I'm not sure how heavily you're waiting the query syntax part > of > > > the > > > > > proposal, so please feel free to defer this point. It looked to me > > like > > > > the > > > > > proposal assumes people want to query either the latest consistent > > > > snapshot > > > > > or the latest inconsistent state. However, it seems like there's a > > > > > significant opportunity to maintain a manifest of historical > > snapshots > > > > and > > > > > allow people to query as of old points in time. That can be > valuable > > > for > > > > > individuals answering data questions, building products, and > > crucially > > > > > supporting auditability use cases. To that latter point, it seems > > nice > > > to > > > > > provide not only a mechanism to query arbitrary snapshots, but also > > to > > > > > define a TTL/GC model that allows users to keep hourly snapshots > for > > N > > > > > hours, daily snapshots for N days, weekly snapshots for N weeks, > and > > > the > > > > > same for monthly, quarterly, and yearly snapshots. > > > > > > > > > > Ok, that's all I have for now :) I'd also like to understand some > > > > > lower-level details, but I wanted to get these high-level questions > > off > > > > my > > > > > chest. > > > > > > > > > > Thanks again for the FLIP! > > > > > -John > > > > > > > > > > On 2023/01/13 11:43:28 Shammon FY wrote: > > > > > > Hi Piotr, > > > > > > > > > > > > I discussed with @jinsong lee about `Timestamp Barrier` and > > `Aligned > > > > > > Checkpoint` for data consistency in FLIP, we think there are many > > > > defects > > > > > > indeed in using `Aligned Checkpoint` to support data consistency > as > > > you > > > > > > mentioned. > > > > > > > > > > > > According to our historical discussion, I think we have reached > an > > > > > > agreement on an important point: we finally need `Timestamp > Barrier > > > > > > Mechanism` to support data consistency. But according to our > > > (@jinsong > > > > > lee > > > > > > and I) opinions, the total design and implementation based on > > > > 'Timestamp > > > > > > Barrier' will be too complex, and it's also too big in one FLIP. > > > > > > > > > > > > So we‘d like to use FLIP-276[1] as an overview design of data > > > > consistency > > > > > > in Flink Streaming and Batch ETL based on `Timestamp Barrier`. > > > @jinsong > > > > > and > > > > > > I hope that we can reach an agreement on the overall design in > > > > FLINK-276 > > > > > > first, and then on the basic of FLIP-276 we can create other > FLIPs > > > with > > > > > > detailed design according to modules and drive them. Finally, we > > can > > > > > > support data consistency based on Timestamp in Flink. > > > > > > > > > > > > I have updated FLIP-276, deleted the Checkpoint section, and > added > > > the > > > > > > overall design of `Timestamp Barrier`. Here I briefly describe > the > > > > > modules > > > > > > of `Timestamp Barrier` as follows > > > > > > 1. Generation: JobManager must coordinate all source subtasks and > > > > > generate > > > > > > a unified timestamp barrier from System Time or Event Time for > them > > > > > > 2. Checkpoint: Store <checkpoint, timestamp barrier> when the > > > timestamp > > > > > > barrier is generated, so that the job can recover the same > > timestamp > > > > > > barrier for the uncompleted checkpoint. > > > > > > 3. Replay data: Store <timestamp barrier, offset> for source when > > it > > > > > > broadcasts timestamp barrier, so that the source can replay the > > same > > > > data > > > > > > according to the same timestamp barrier. > > > > > > 4. Align data: Align data for stateful operator(aggregation, join > > and > > > > > etc.) > > > > > > and temporal operator(window) > > > > > > 5. Computation: Operator computation for a specific timestamp > > barrier > > > > > based > > > > > > on the results of a previous timestamp barrier. > > > > > > 6. Output: Operator outputs or commits results when it collects > all > > > the > > > > > > timestamp barriers, including operators with data buffer or async > > > > > > operations. > > > > > > > > > > > > I also list the main work in Flink and Table Store in FLIP-276. > > > Please > > > > > help > > > > > > to review the FLIP when you're free and feel free to give any > > > comments. > > > > > > > > > > > > Looking forward for your feedback, THX > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store > > > > > > > > > > > > Best, > > > > > > Shammon > > > > > > > > > > > > > > > > > > On Tue, Dec 20, 2022 at 10:01 AM Shammon FY <zjur...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > > > Thanks for your syncing. I will update the FLIP later and keep > > this > > > > > > > discussion open. Looking forward to your feedback, thanks > > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > Shammon > > > > > > > > > > > > > > > > > > > > > On Mon, Dec 19, 2022 at 10:45 PM Piotr Nowojski < > > > > pnowoj...@apache.org> > > > > > > > wrote: > > > > > > > > > > > > > >> Hi Shammon, > > > > > > >> > > > > > > >> I've tried to sync with Timo, David Moravek and Dawid > Wysakowicz > > > > about > > > > > > >> this > > > > > > >> subject. We have only briefly chatted and exchanged some > > > > > thoughts/ideas, > > > > > > >> but unfortunately we were not able to finish the discussions > > > before > > > > > the > > > > > > >> holiday season/vacations. Can we get back to this topic in > > > January? > > > > > > >> > > > > > > >> Best, > > > > > > >> Piotrek > > > > > > >> > > > > > > >> pt., 16 gru 2022 o 10:53 Shammon FY <zjur...@gmail.com> > > > napisał(a): > > > > > > >> > > > > > > >> > Hi Piotr, > > > > > > >> > > > > > > > >> > I found there may be several points in our discussion, it > will > > > > cause > > > > > > >> > misunderstanding between us when we focus on different one. > I > > > list > > > > > each > > > > > > >> > point in our discussion as follows > > > > > > >> > > > > > > > >> > > Point 1: Is "Aligned Checkpoint" the only mechanism to > > > guarantee > > > > > data > > > > > > >> > consistency in the current Flink implementation, and > > "Watermark" > > > > and > > > > > > >> > "Aligned Checkpoint cannot do that? > > > > > > >> > My answer is "Yes", the "Aligned Checkpoint" is the only one > > due > > > > to > > > > > its > > > > > > >> > "Align Data" ability, we can do it in the first stage. > > > > > > >> > > > > > > > >> > > Point2: Can the combination of "Checkpoint Barrier" and > > > > > "Watermark" > > > > > > >> > support the complete consistency semantics based on > > "Timestamp" > > > in > > > > > the > > > > > > >> > current Flink implementation? > > > > > > >> > My answer is "No", we need a new "Timestamp Barrier" > mechanism > > > to > > > > do > > > > > > >> that > > > > > > >> > which may be upgraded from current "Watermark" or a new > > > mechanism, > > > > > we > > > > > > >> can > > > > > > >> > do it in the next second or third stage. > > > > > > >> > > > > > > > >> > > Point3: Are the "Checkpoint" and the new "Timestamp > Barrier" > > > > > > >> completely > > > > > > >> > independent? The "Checkpoint" whatever "Aligned" or > > "Unaligned" > > > or > > > > > "Task > > > > > > >> > Local" supports the "Exactly-Once" between ETLs, and the > > > > "Timestamp > > > > > > >> > Barrier" mechanism guarantees data consistency between > tables > > > > > according > > > > > > >> to > > > > > > >> > timestamp for queries. > > > > > > >> > My answer is "Yes", I totally agree with you. Let > "Checkpoint" > > > be > > > > > > >> > responsible for fault tolerance and "Timestamp Barrier" for > > > > > consistency > > > > > > >> > independently. > > > > > > >> > > > > > > > >> > @Piotr, What do you think? If I am missing or > misunderstanding > > > > > anything, > > > > > > >> > please correct me, thanks > > > > > > >> > > > > > > > >> > Best, > > > > > > >> > Shammon > > > > > > >> > > > > > > > >> > On Fri, Dec 16, 2022 at 4:17 PM Piotr Nowojski < > > > > > pnowoj...@apache.org> > > > > > > >> > wrote: > > > > > > >> > > > > > > > >> > > Hi Shammon, > > > > > > >> > > > > > > > > >> > > > I don't think we can combine watermarks and checkpoint > > > > barriers > > > > > > >> > together > > > > > > >> > > to > > > > > > >> > > > guarantee data consistency. There will be a "Timestamp > > > > Barrier" > > > > > in > > > > > > >> our > > > > > > >> > > > system to "commit data", "single etl failover", "low > > latency > > > > > between > > > > > > >> > > ETLs" > > > > > > >> > > > and "strong data consistency with completed semantics" > in > > > the > > > > > end. > > > > > > >> > > > > > > > > >> > > Why do you think so? I've described to you above an > > > alternative > > > > > where > > > > > > >> we > > > > > > >> > > could be using watermarks for data consistency, regardless > > of > > > > what > > > > > > >> > > checkpointing/fault tolerance mechanism Flink would be > > using. > > > > Can > > > > > you > > > > > > >> > > explain what's wrong with that approach? Let me rephrase > it: > > > > > > >> > > > > > > > > >> > > 1. There is an independent mechanism that provides > > > exactly-once > > > > > > >> > guarantees, > > > > > > >> > > committing records/watermarks/events and taking care of > the > > > > > failover. > > > > > > >> It > > > > > > >> > > might be aligned, unaligned or task local checkpointing - > > this > > > > > doesn't > > > > > > >> > > matter. Let's just assume we have such a mechanism. > > > > > > >> > > 2. There is a watermarking mechanism (it can be some kind > of > > > > > system > > > > > > >> > > versioning re-using watermarks code path if a user didn't > > > > > configure > > > > > > >> > > watermarks), that takes care of the data consistency. > > > > > > >> > > > > > > > > >> > > Because watermarks from 2. are also subject to the > > > exactly-once > > > > > > >> > guarantees > > > > > > >> > > from the 1., once they are committed downstream systems > > (Flink > > > > > jobs or > > > > > > >> > > other 3rd party systems) could just easily work with the > > > > committed > > > > > > >> > > watermarks to provide consistent view/snapshot of the > > tables. > > > > Any > > > > > > >> > > downstream system could always check what are the > committed > > > > > > >> watermarks, > > > > > > >> > > select the watermark value (for example min across all > used > > > > > tables), > > > > > > >> and > > > > > > >> > > ask every table: please give me all of the data up until > the > > > > > selected > > > > > > >> > > watermark. Or give me all tables in the version for the > > > selected > > > > > > >> > watermark. > > > > > > >> > > > > > > > > >> > > Am I missing something? To me it seems like this way we > can > > > > fully > > > > > > >> > decouple > > > > > > >> > > the fault tolerance mechanism from the subject of the data > > > > > > >> consistency. > > > > > > >> > > > > > > > > >> > > Best, > > > > > > >> > > Piotrek > > > > > > >> > > > > > > > > >> > > czw., 15 gru 2022 o 13:01 Shammon FY <zjur...@gmail.com> > > > > > napisał(a): > > > > > > >> > > > > > > > > >> > > > Hi Piotr, > > > > > > >> > > > > > > > > > >> > > > It's kind of amazing about the image, it's a simple > > example > > > > and > > > > > I > > > > > > >> have > > > > > > >> > to > > > > > > >> > > > put it in a document > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > https://bytedance.feishu.cn/docx/FC6zdq0eqoWxHXxli71cOxe9nEe?from=from_copylink > > > > > > >> > > > :) > > > > > > >> > > > > > > > > > >> > > > > Does it have to be combining watermarks and checkpoint > > > > > barriers > > > > > > >> > > together? > > > > > > >> > > > > > > > > > >> > > > It's an interesting question. As we discussed above, > what > > we > > > > > need > > > > > > >> from > > > > > > >> > > > "Checkpoint" is the "Align Data Ability", and from > > > "Watermark" > > > > > is > > > > > > >> the > > > > > > >> > > > "Consistency Semantics", > > > > > > >> > > > > > > > > > >> > > > 1) Only "Align Data" can reach data consistency when > > > > performing > > > > > > >> queries > > > > > > >> > > on > > > > > > >> > > > upstream and downstream tables. I gave an example of > > "Global > > > > > Count > > > > > > >> > > Tables" > > > > > > >> > > > in our previous discussion. We need a "Align Event" in > the > > > > > streaming > > > > > > >> > > > processing, it's the most basic. > > > > > > >> > > > > > > > > > >> > > > 2) Only "Timestamp" can provide complete consistency > > > > semantics. > > > > > You > > > > > > >> > gave > > > > > > >> > > > some good examples about "Window" and ect operators. > > > > > > >> > > > > > > > > > >> > > > I don't think we can combine watermarks and checkpoint > > > > barriers > > > > > > >> > together > > > > > > >> > > to > > > > > > >> > > > guarantee data consistency. There will be a "Timestamp > > > > Barrier" > > > > > in > > > > > > >> our > > > > > > >> > > > system to "commit data", "single etl failover", "low > > latency > > > > > between > > > > > > >> > > ETLs" > > > > > > >> > > > and "strong data consistency with completed semantics" > in > > > the > > > > > end. > > > > > > >> > > > > > > > > > >> > > > At the beginning I think we can do the simplest thing > > first: > > > > > > >> guarantee > > > > > > >> > > the > > > > > > >> > > > basic data consistency with a "Barrier Mechanism". In > the > > > > > current > > > > > > >> Flink > > > > > > >> > > > there's "Aligned Checkpoint" only, that's why we choose > > > > > > >> "Checkpoint" in > > > > > > >> > > our > > > > > > >> > > > FLIP. > > > > > > >> > > > > > > > > > >> > > > > I don't see an actual connection in the the > > implementation > > > > > steps > > > > > > >> > > between > > > > > > >> > > > the checkpoint barriers approach and the watermark-like > > > > approach > > > > > > >> > > > > > > > > > >> > > > As I mentioned above, we choose "Checkpoint" to > guarantee > > > the > > > > > basic > > > > > > >> > data > > > > > > >> > > > consistency. But as we discussed, the most ideal > solution > > is > > > > > > >> "Timestamp > > > > > > >> > > > Barrier". After the first stage is completed based on > the > > > > > > >> "Checkpoint", > > > > > > >> > > we > > > > > > >> > > > need to evolve it to our ideal solution "Timestamp > > Barrier" > > > > > > >> > > (watermark-like > > > > > > >> > > > approach) in the next second or third stage. This does > not > > > > mean > > > > > > >> > upgrading > > > > > > >> > > > "Checkpoint Mechanism" in Flink. It means that after we > > > > > implement a > > > > > > >> new > > > > > > >> > > > "Timestamp Barrier" or upgrade "Watermark" to support > it, > > we > > > > can > > > > > > >> use it > > > > > > >> > > > instead of the current "Checkpoint Mechanism" directly > in > > > our > > > > > > >> > > "MetaService" > > > > > > >> > > > and "Table Store". > > > > > > >> > > > > > > > > > >> > > > In the discussion between @David and me, I summarized > the > > > work > > > > > of > > > > > > >> > > upgrading > > > > > > >> > > > "Watermark" to support "Timestamp Barrier". It looks > like > > a > > > > big > > > > > job > > > > > > >> and > > > > > > >> > > you > > > > > > >> > > > can find the details in our discussion. I think we don't > > > need > > > > > to do > > > > > > >> > that > > > > > > >> > > in > > > > > > >> > > > our first stage. > > > > > > >> > > > > > > > > > >> > > > Also in that discussion (my reply to @David) too, I > > briefly > > > > > > >> summarized > > > > > > >> > > the > > > > > > >> > > > work that needs to be done to use the new mechanism > > > (Timestamp > > > > > > >> Barrier) > > > > > > >> > > > after we implement the basic function on "Checkpoint". > It > > > > seems > > > > > that > > > > > > >> > the > > > > > > >> > > > work is not too big on my side, and it is feasible on > the > > > > whole. > > > > > > >> > > > > > > > > > >> > > > Based on the above points, I think we can support basic > > data > > > > > > >> > consistency > > > > > > >> > > on > > > > > > >> > > > "Checkpoint" in the first stage which is described in > > FLIP, > > > > and > > > > > > >> > continue > > > > > > >> > > to > > > > > > >> > > > evolve it to "Timestamp Barrier" to support low latency > > > > between > > > > > ETLs > > > > > > >> > and > > > > > > >> > > > completed semantics in the second or third stage later. > > > What > > > > > do you > > > > > > >> > > think? > > > > > > >> > > > > > > > > > >> > > > Best, > > > > > > >> > > > Shammon > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > On Thu, Dec 15, 2022 at 4:21 PM Piotr Nowojski < > > > > > > >> pnowoj...@apache.org> > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > >> > > > > Hi Shammon, > > > > > > >> > > > > > > > > > > >> > > > > > The following is a simple example. Data is > transferred > > > > > between > > > > > > >> > ETL1, > > > > > > >> > > > ETL2 > > > > > > >> > > > > and ETL3 in Intermediate Table by Timestamp. > > > > > > >> > > > > > [image: simple_example.jpg] > > > > > > >> > > > > > > > > > > >> > > > > This time it's your image that doesn't want to load :) > > > > > > >> > > > > > > > > > > >> > > > > > Timestamp Barrier > > > > > > >> > > > > > > > > > > >> > > > > Does it have to be combining watermarks and checkpoint > > > > > barriers > > > > > > >> > > together? > > > > > > >> > > > > Can we not achieve the same result with two > independent > > > > > processes > > > > > > >> > > > > checkpointing (regardless if this is a global > > > > > aligned/unaligned > > > > > > >> > > > checkpoint, > > > > > > >> > > > > or a task local checkpoint) plus watermarking? > > > Checkpointing > > > > > would > > > > > > >> > > > provide > > > > > > >> > > > > exactly-once guarantees, and actually committing the > > > > results, > > > > > and > > > > > > >> it > > > > > > >> > > > would > > > > > > >> > > > > be actually committing the last emitted watermark? > From > > > the > > > > > > >> > perspective > > > > > > >> > > > of > > > > > > >> > > > > the sink/table, it shouldn't really matter how the > > > > > exactly-once is > > > > > > >> > > > > achieved, and whether the job has performed an > unaligned > > > > > > >> checkpoint > > > > > > >> > or > > > > > > >> > > > > something completely different. It seems to me that > the > > > > > sink/table > > > > > > >> > > > > could/should be able to understand/work with only the > > > basic > > > > > > >> > > information: > > > > > > >> > > > > here are records and watermarks (with at that point of > > > time > > > > > > >> already > > > > > > >> > > fixed > > > > > > >> > > > > order), they are committed and will never change. > > > > > > >> > > > > > > > > > > >> > > > > > However, from the perspective of implementation > > > > complexity, > > > > > I > > > > > > >> > > > personally > > > > > > >> > > > > think using Checkpoint in the first phase makes sense, > > > what > > > > > do you > > > > > > >> > > think? > > > > > > >> > > > > > > > > > > >> > > > > Maybe I'm missing something, but I don't see an actual > > > > > connection > > > > > > >> in > > > > > > >> > > the > > > > > > >> > > > > implementation steps between the checkpoint barriers > > > > approach > > > > > and > > > > > > >> the > > > > > > >> > > > > watermark-like approach. They seem to me (from the > > > > > perspective of > > > > > > >> > Flink > > > > > > >> > > > > runtime at least) like two completely different > > > mechanisms. > > > > > Not > > > > > > >> one > > > > > > >> > > > leading > > > > > > >> > > > > to the other. > > > > > > >> > > > > > > > > > > >> > > > > Best, > > > > > > >> > > > > Piotrek > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > śr., 14 gru 2022 o 15:19 Shammon FY < > zjur...@gmail.com> > > > > > > >> napisał(a): > > > > > > >> > > > > > > > > > > >> > > > > > Hi Piotr, > > > > > > >> > > > > > > > > > > > >> > > > > > Thanks for your valuable input which makes me > consider > > > the > > > > > core > > > > > > >> > point > > > > > > >> > > > of > > > > > > >> > > > > > data consistency in deep. I'd like to define the > data > > > > > > >> consistency > > > > > > >> > on > > > > > > >> > > > the > > > > > > >> > > > > > whole streaming & batch processing as follows and I > > hope > > > > > that we > > > > > > >> > can > > > > > > >> > > > have > > > > > > >> > > > > > an agreement on it: > > > > > > >> > > > > > > > > > > > >> > > > > > BOutput = Fn(BInput), BInput is a bounded input > which > > is > > > > > > >> splitted > > > > > > >> > > from > > > > > > >> > > > > > unbounded streaming, Fn is the computation of a node > > or > > > > ETL, > > > > > > >> > BOutput > > > > > > >> > > is > > > > > > >> > > > > the > > > > > > >> > > > > > bounded output of BInput. All the data in BInput and > > > > > BOutput are > > > > > > >> > > > > unordered, > > > > > > >> > > > > > and BInput and BOutput are data consistent. > > > > > > >> > > > > > > > > > > > >> > > > > > The key points above include 1) the segment > semantics > > of > > > > > > >> BInput; 2) > > > > > > >> > > the > > > > > > >> > > > > > computation semantics of Fn > > > > > > >> > > > > > > > > > > > >> > > > > > 1. The segment semantics of BInput > > > > > > >> > > > > > a) Transactionality of data. It is necessary to > ensure > > > the > > > > > > >> semantic > > > > > > >> > > > > > transaction of the bounded data set when it is > > splitted > > > > > from the > > > > > > >> > > > > unbounded > > > > > > >> > > > > > streaming. For example, we cannot split multiple > > records > > > > in > > > > > one > > > > > > >> > > > > transaction > > > > > > >> > > > > > to different bounded data sets. > > > > > > >> > > > > > b) Timeliness of data. Some data is related with > time, > > > > such > > > > > as > > > > > > >> > > boundary > > > > > > >> > > > > > data for a window. It is necessary to consider > whether > > > the > > > > > > >> bounded > > > > > > >> > > data > > > > > > >> > > > > set > > > > > > >> > > > > > needs to include a watermark which can trigger the > > > window > > > > > > >> result. > > > > > > >> > > > > > c) Constraints of data. The Timestamp Barrier should > > > > perform > > > > > > >> some > > > > > > >> > > > > specific > > > > > > >> > > > > > operations after computation in operators, for > > example, > > > > > force > > > > > > >> flush > > > > > > >> > > > data. > > > > > > >> > > > > > > > > > > > >> > > > > > Checkpoint Barrier misses all the semantics above, > and > > > we > > > > > should > > > > > > >> > > > support > > > > > > >> > > > > > user to define Timestamp for data on Event Time or > > > System > > > > > Time > > > > > > >> > > > according > > > > > > >> > > > > to > > > > > > >> > > > > > the job and computation later. > > > > > > >> > > > > > > > > > > > >> > > > > > 2. The computation semantics of Fn > > > > > > >> > > > > > a) Deterministic computation > > > > > > >> > > > > > Most computations are deterministic such as map, > > filter, > > > > > count, > > > > > > >> sum > > > > > > >> > > and > > > > > > >> > > > > > ect. They generate the same unordered result from > the > > > same > > > > > > >> > unordered > > > > > > >> > > > > input > > > > > > >> > > > > > every time, and we can easily define data > consistency > > on > > > > the > > > > > > >> input > > > > > > >> > > and > > > > > > >> > > > > > output for them. > > > > > > >> > > > > > > > > > > > >> > > > > > b) Non-deterministic computation > > > > > > >> > > > > > Some computations are non-deterministic. They will > > > produce > > > > > > >> > different > > > > > > >> > > > > > results from the same input every time. I try to > > divide > > > > them > > > > > > >> into > > > > > > >> > the > > > > > > >> > > > > > following types: > > > > > > >> > > > > > 1) Non-deterministic computation semantics, such as > > rank > > > > > > >> operator. > > > > > > >> > > When > > > > > > >> > > > > it > > > > > > >> > > > > > computes multiple times (for example, failover), the > > > first > > > > > or > > > > > > >> last > > > > > > >> > > > output > > > > > > >> > > > > > results can both be the final result which will > cause > > > > > different > > > > > > >> > > > failover > > > > > > >> > > > > > handlers for downstream jobs. I will expand it > later. > > > > > > >> > > > > > 2) Non-deterministic computation optimization, such > as > > > > async > > > > > > >> io. It > > > > > > >> > > is > > > > > > >> > > > > > necessary to sync these operations when the barrier > of > > > > input > > > > > > >> > arrives. > > > > > > >> > > > > > 3) Deviation caused by data segmentat and > computation > > > > > semantics, > > > > > > >> > such > > > > > > >> > > > as > > > > > > >> > > > > > Window. This requires that the users should > customize > > > the > > > > > data > > > > > > >> > > > > segmentation > > > > > > >> > > > > > according to their needs correctly. > > > > > > >> > > > > > > > > > > > >> > > > > > Checkpoint Barrier matches a) and Timestamp Barrier > > can > > > > > match > > > > > > >> all > > > > > > >> > a) > > > > > > >> > > > and > > > > > > >> > > > > > b). > > > > > > >> > > > > > > > > > > > >> > > > > > We define data consistency of BInput and BOutput > based > > > all > > > > > > >> above. > > > > > > >> > The > > > > > > >> > > > > > BOutput of upstream ETL will be the BInput of the > next > > > > ETL, > > > > > and > > > > > > >> > > > multiple > > > > > > >> > > > > > ETL jobs form a complex "ETL Topology". > > > > > > >> > > > > > > > > > > > >> > > > > > Based on the above definitions, I'd like to give a > > > general > > > > > > >> proposal > > > > > > >> > > > with > > > > > > >> > > > > > "Timetamp Barrier" in my mind, it's not very > detailed > > > and > > > > > please > > > > > > >> > help > > > > > > >> > > > to > > > > > > >> > > > > > review it and feel free to comment @David, @Piotr > > > > > > >> > > > > > > > > > > > >> > > > > > 1. Data segment with Timestamp > > > > > > >> > > > > > a) Users can define the Timestamp Barrier with > System > > > > Time, > > > > > > >> Event > > > > > > >> > > Time. > > > > > > >> > > > > > b) Source nodes generate the same Timestamp Barrier > > > after > > > > > > >> reading > > > > > > >> > > data > > > > > > >> > > > > > from RootTable > > > > > > >> > > > > > c) There is a same Timetamp data in each record > > > according > > > > to > > > > > > >> > > Timestamp > > > > > > >> > > > > > Barrier, such as (a, T), (b, T), (c, T), (T, > barrier) > > > > > > >> > > > > > > > > > > > >> > > > > > 2. Computation with Timestamp > > > > > > >> > > > > > a) Records are unordered with the same Timestamp. > > > > Stateless > > > > > > >> > operators > > > > > > >> > > > > such > > > > > > >> > > > > > as map/flatmap/filter can process data without > > aligning > > > > > > >> Timestamp > > > > > > >> > > > > Barrier, > > > > > > >> > > > > > which is different from Checkpoint Barrier. > > > > > > >> > > > > > b) Records between Timestamp are ordered. Stateful > > > > operators > > > > > > >> must > > > > > > >> > > align > > > > > > >> > > > > > data and compute by each Timestamp, then compute by > > > > Timetamp > > > > > > >> > > sequence. > > > > > > >> > > > > > c) Stateful operators will output results of > specific > > > > > Timestamp > > > > > > >> > after > > > > > > >> > > > > > computation. > > > > > > >> > > > > > d) Sink operator "commit records" with specific > > > Timestamp > > > > > and > > > > > > >> > report > > > > > > >> > > > the > > > > > > >> > > > > > status to JobManager > > > > > > >> > > > > > > > > > > > >> > > > > > 3. Read data with Timestamp > > > > > > >> > > > > > a) Downstream ETL reads data according to Timestamp > > > after > > > > > > >> upstream > > > > > > >> > > ETL > > > > > > >> > > > > > "commit" it. > > > > > > >> > > > > > b) Stateful operators interact with state when > > computing > > > > > data of > > > > > > >> > > > > > Timestamp, but they won't trigger checkpoint for > every > > > > > > >> Timestamp. > > > > > > >> > > > > Therefore > > > > > > >> > > > > > source ETL job can generate Timestamp every few > > seconds > > > or > > > > > even > > > > > > >> > > > hundreds > > > > > > >> > > > > of > > > > > > >> > > > > > milliseconds > > > > > > >> > > > > > c) Based on Timestamp the delay between ETL jobs > will > > be > > > > > very > > > > > > >> > small, > > > > > > >> > > > and > > > > > > >> > > > > > in the best case the E2E latency maybe only tens of > > > > seconds. > > > > > > >> > > > > > > > > > > > >> > > > > > 4. Failover and Recovery > > > > > > >> > > > > > ETL jobs are cascaded through the Intermediate > Table. > > > > After > > > > > a > > > > > > >> > single > > > > > > >> > > > ETL > > > > > > >> > > > > > job fails, it needs to replay the input data and > > > recompute > > > > > the > > > > > > >> > > results. > > > > > > >> > > > > As > > > > > > >> > > > > > you mentioned, whether the cascaded ETL jobs are > > > restarted > > > > > > >> depends > > > > > > >> > on > > > > > > >> > > > the > > > > > > >> > > > > > determinacy of the intermediate data between them. > > > > > > >> > > > > > a) An ETL job will rollback and reread data from > > > upstream > > > > > ETL by > > > > > > >> > > > specific > > > > > > >> > > > > > Timestamp according to the Checkpoint. > > > > > > >> > > > > > b) According to the management of Checkpoint and > > > > Timestamp, > > > > > ETL > > > > > > >> can > > > > > > >> > > > > replay > > > > > > >> > > > > > all Timestamp and data after failover, which means > > > BInput > > > > > is the > > > > > > >> > same > > > > > > >> > > > > > before and after failover. > > > > > > >> > > > > > > > > > > > >> > > > > > c) For deterministic Fn, it generates the same > BOutput > > > > from > > > > > the > > > > > > >> > same > > > > > > >> > > > > BInput > > > > > > >> > > > > > 1) If there's no data of the specific Timestamp in > the > > > > sink > > > > > > >> table, > > > > > > >> > > ETL > > > > > > >> > > > > > just "commit" it as normal. > > > > > > >> > > > > > 2) If the Timestamp data exists in the sink table, > ETL > > > can > > > > > just > > > > > > >> > > discard > > > > > > >> > > > > > the new data. > > > > > > >> > > > > > > > > > > > >> > > > > > d) For non-deterministic Fn, it generates different > > > > BOutput > > > > > from > > > > > > >> > the > > > > > > >> > > > same > > > > > > >> > > > > > BInput before and after failover. For example, > > BOutput1 > > > > > before > > > > > > >> > > failover > > > > > > >> > > > > and > > > > > > >> > > > > > BOutput2 after failover. The state in ETL is > > consistent > > > > with > > > > > > >> > > BOutput2. > > > > > > >> > > > > > There are two cases according to users' requirements > > > > > > >> > > > > > 1) Users can accept BOutput1 as the final output and > > > > > downstream > > > > > > >> > ETLs > > > > > > >> > > > > don't > > > > > > >> > > > > > need to restart. Sink in ETL can discard BOutput2 > > > directly > > > > > if > > > > > > >> the > > > > > > >> > > > > Timestamp > > > > > > >> > > > > > exists in the sink table. > > > > > > >> > > > > > 2) Users only accept BOutput2 as the final output, > > then > > > > all > > > > > the > > > > > > >> > > > > downstream > > > > > > >> > > > > > ETLs and Intermediate Table should rollback to > > specific > > > > > > >> Timestamp, > > > > > > >> > > the > > > > > > >> > > > > > downstream ETLs should be restarted too. > > > > > > >> > > > > > > > > > > > >> > > > > > The following is a simple example. Data is > transferred > > > > > between > > > > > > >> > ETL1, > > > > > > >> > > > ETL2 > > > > > > >> > > > > > and ETL3 in Intermediate Table by Timestamp. > > > > > > >> > > > > > [image: simple_example.jpg] > > > > > > >> > > > > > > > > > > > >> > > > > > Besides Timestamp, there's a big challenge in > > > Intermediate > > > > > > >> Table. > > > > > > >> > It > > > > > > >> > > > > > should support a highly implemented "commit > Timestamp > > > > > snapshot" > > > > > > >> > with > > > > > > >> > > > high > > > > > > >> > > > > > throughput, which requires the Table Store to > enhance > > > > > streaming > > > > > > >> > > > > > capabilities like pulsar or kafka. > > > > > > >> > > > > > > > > > > > >> > > > > > In this FLIP, we plan to implement the proposal with > > > > > Checkpoint, > > > > > > >> > the > > > > > > >> > > > > above > > > > > > >> > > > > > Timestamp can be replaced by Checkpoint. Of course, > > > > > Checkpoint > > > > > > >> has > > > > > > >> > > some > > > > > > >> > > > > > problems. I think we have reached some consensus in > > the > > > > > > >> discussion > > > > > > >> > > > about > > > > > > >> > > > > > the Checkpoint problems, including data segment > > > semantics, > > > > > flush > > > > > > >> > data > > > > > > >> > > > of > > > > > > >> > > > > > some operators, and the increase of E2E delay. > > However, > > > > > from the > > > > > > >> > > > > > perspective of implementation complexity, I > personally > > > > think > > > > > > >> using > > > > > > >> > > > > > Checkpoint in the first phase makes sense, what do > you > > > > > think? > > > > > > >> > > > > > > > > > > > >> > > > > > Finally, I think I misunderstood the "Rolling > > > Checkpoint" > > > > > and > > > > > > >> "All > > > > > > >> > at > > > > > > >> > > > > once > > > > > > >> > > > > > Checkpoint" in my last explanation which you and > > @David > > > > > > >> mentioned. > > > > > > >> > I > > > > > > >> > > > > > thought their differences were mainly to select > > > different > > > > > table > > > > > > >> > > > versions > > > > > > >> > > > > > for queries. According to your reply, I think it is > > > > whether > > > > > > >> there > > > > > > >> > are > > > > > > >> > > > > > multiple "rolling checkpoints" in each ETL job, > right? > > > If > > > > I > > > > > > >> > > understand > > > > > > >> > > > > > correctly, the "Rolling Checkpoint" is a good idea, > > and > > > we > > > > > can > > > > > > >> > > > guarantee > > > > > > >> > > > > > "Strong Data Consistency" between multiple tables in > > > > > MetaService > > > > > > >> > for > > > > > > >> > > > > > queries. Thanks. > > > > > > >> > > > > > > > > > > > >> > > > > > Best, > > > > > > >> > > > > > Shammon > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > On Tue, Dec 13, 2022 at 9:36 PM Piotr Nowojski < > > > > > > >> > pnowoj...@apache.org > > > > > > >> > > > > > > > > > >> > > > > > wrote: > > > > > > >> > > > > > > > > > > > >> > > > > >> Hi Shammon, > > > > > > >> > > > > >> > > > > > > >> > > > > >> Thanks for the explanations, I think I understand > the > > > > > problem > > > > > > >> > better > > > > > > >> > > > > now. > > > > > > >> > > > > >> I have a couple of follow up questions, but first: > > > > > > >> > > > > >> > > > > > > >> > > > > >> >> 3. I'm pretty sure there are counter examples, > > where > > > > > your > > > > > > >> > > proposed > > > > > > >> > > > > >> mechanism of using checkpoints (even aligned!) will > > > > produce > > > > > > >> > > > > >> inconsistent data from the perspective of the event > > > time. > > > > > > >> > > > > >> >> a) For example what if one of your "ETL" jobs, > > has > > > > the > > > > > > >> > following > > > > > > >> > > > > DAG: > > > > > > >> > > > > >> >> > > > > > > >> > > > > >> >> Even if you use aligned checkpoints for > > committing > > > > the > > > > > > >> data to > > > > > > >> > > the > > > > > > >> > > > > >> sink table, the watermarks of "Window1" and > "Window2" > > > are > > > > > > >> > completely > > > > > > >> > > > > >> independent. The sink table might easily have data > > from > > > > the > > > > > > >> > > > Src1/Window1 > > > > > > >> > > > > >> from the event time T1 and Src2/Window2 from later > > > event > > > > > time > > > > > > >> T2. > > > > > > >> > > > > >> >> b) I think the same applies if you have two > > > > completely > > > > > > >> > > > > >> independent ETL jobs writing either to the same > sink > > > > > table, or > > > > > > >> two > > > > > > >> > > to > > > > > > >> > > > > >> different sink tables (that are both later used in > > the > > > > same > > > > > > >> > > downstream > > > > > > >> > > > > job). > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > Thank you for your feedback. I cannot see the DAG > > in > > > > 3.a > > > > > in > > > > > > >> your > > > > > > >> > > > > reply, > > > > > > >> > > > > >> > > > > > > >> > > > > >> I've attached the image directly. I hope you can > see > > it > > > > > now. > > > > > > >> > > > > >> > > > > > > >> > > > > >> Basically what I meant is that if you have a > topology > > > > like > > > > > > >> (from > > > > > > >> > the > > > > > > >> > > > > >> attached image): > > > > > > >> > > > > >> > > > > > > >> > > > > >> window1 = src1.keyBy(...).window(...) > > > > > > >> > > > > >> window2 = src2.keyBy(...).window(...) > > > > > > >> > > > > >> window1.join(window2, ...).addSink(sink) > > > > > > >> > > > > >> > > > > > > >> > > > > >> or with even simpler (note no keyBy between `src` > and > > > > > > >> `process`): > > > > > > >> > > > > >> > > > > > > >> > > > > >> > > > > src.process(some_function_that_buffers_data)..addSink(sink) > > > > > > >> > > > > >> > > > > > > >> > > > > >> you will have the same problem. Generally speaking > if > > > > > there is > > > > > > >> an > > > > > > >> > > > > >> operator buffering some data, and if the data are > not > > > > > flushed > > > > > > >> on > > > > > > >> > > every > > > > > > >> > > > > >> checkpoint (any windowed or temporal operator, > > > > > > >> AsyncWaitOperator, > > > > > > >> > > CEP, > > > > > > >> > > > > >> ...), you can design a graph that will produce > > > > > "inconsistent" > > > > > > >> data > > > > > > >> > > as > > > > > > >> > > > > part > > > > > > >> > > > > >> of a checkpoint. > > > > > > >> > > > > >> > > > > > > >> > > > > >> Apart from that a couple of other questions/issues. > > > > > > >> > > > > >> > > > > > > >> > > > > >> > 1) Global Checkpoint Commit: a) "rolling fashion" > > or > > > b) > > > > > > >> > altogether > > > > > > >> > > > > >> > > > > > > >> > > > > >> Do we need to support the "altogether" one? Rolling > > > > > > >> checkpoint, as > > > > > > >> > > > it's > > > > > > >> > > > > >> more independent, I could see it scale much better, > > and > > > > > avoid a > > > > > > >> > lot > > > > > > >> > > of > > > > > > >> > > > > >> problems that I mentioned before. > > > > > > >> > > > > >> > > > > > > >> > > > > >> > 1) Checkpoint VS Watermark > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > 1. Stateful Computation is aligned according to > > > > Timestamp > > > > > > >> > Barrier > > > > > > >> > > > > >> > > > > > > >> > > > > >> Indeed the biggest obstacle I see here, is that we > > > would > > > > > indeed > > > > > > >> > most > > > > > > >> > > > > >> likely have: > > > > > > >> > > > > >> > > > > > > >> > > > > >> > b) Similar to the window operator, align data in > > > memory > > > > > > >> > according > > > > > > >> > > to > > > > > > >> > > > > >> Timestamp. > > > > > > >> > > > > >> > > > > > > >> > > > > >> for every operator. > > > > > > >> > > > > >> > > > > > > >> > > > > >> > 4. Failover supports Timestamp fine-grained data > > > > recovery > > > > > > >> > > > > >> > > > > > > > >> > > > > >> > As we mentioned in the FLIP, each ETL is a > complex > > > > single > > > > > > >> node. > > > > > > >> > A > > > > > > >> > > > > single > > > > > > >> > > > > >> > ETL job failover should not cause the failure of > > the > > > > > entire > > > > > > >> "ETL > > > > > > >> > > > > >> Topology". > > > > > > >> > > > > >> > > > > > > >> > > > > >> I don't understand this point. Regardless if we are > > > using > > > > > > >> > > > > >> rolling checkpoints, all at once checkpoints or > > > > > watermarks, I > > > > > > >> see > > > > > > >> > > the > > > > > > >> > > > > same > > > > > > >> > > > > >> problems with non determinism, if we want to > preserve > > > the > > > > > > >> > > requirement > > > > > > >> > > > to > > > > > > >> > > > > >> not fail over the whole topology at once. > > > > > > >> > > > > >> > > > > > > >> > > > > >> Both Watermarks and "rolling checkpoint" I think > have > > > the > > > > > same > > > > > > >> > > issue, > > > > > > >> > > > > >> that either require deterministic logic, or global > > > > > failover, or > > > > > > >> > > > > downstream > > > > > > >> > > > > >> jobs can only work on the already committed by the > > > > upstream > > > > > > >> > records. > > > > > > >> > > > But > > > > > > >> > > > > >> working with only "committed records" would either > > > brake > > > > > > >> > consistency > > > > > > >> > > > > >> between different jobs, or would cause huge delay > in > > > > > > >> checkpointing > > > > > > >> > > and > > > > > > >> > > > > e2e > > > > > > >> > > > > >> latency, as: > > > > > > >> > > > > >> 1. upstream job has to produce some data, > downstream > > > can > > > > > not > > > > > > >> > process > > > > > > >> > > > it, > > > > > > >> > > > > >> downstream can not process this data yet > > > > > > >> > > > > >> 2. checkpoint 42 is triggered on the upstream job > > > > > > >> > > > > >> 3. checkpoint 42 is completed on the upstream job, > > data > > > > > > >> processed > > > > > > >> > > > since > > > > > > >> > > > > >> last checkpoint has been committed > > > > > > >> > > > > >> 4. upstream job can continue producing more data > > > > > > >> > > > > >> 5. only now downstream can start processing the > data > > > > > produced > > > > > > >> in > > > > > > >> > 1., > > > > > > >> > > > but > > > > > > >> > > > > >> it can not read the not-yet-committed data from 4. > > > > > > >> > > > > >> 6. once downstream finishes processing data from > 1., > > it > > > > can > > > > > > >> > trigger > > > > > > >> > > > > >> checkpoint 42 > > > > > > >> > > > > >> > > > > > > >> > > > > >> The "all at once checkpoint", I can see only > working > > > with > > > > > > >> global > > > > > > >> > > > > failover > > > > > > >> > > > > >> of everything. > > > > > > >> > > > > >> > > > > > > >> > > > > >> This is assuming exactly-once mode. at-least-once > > would > > > > be > > > > > much > > > > > > >> > > > easier. > > > > > > >> > > > > >> > > > > > > >> > > > > >> Best, > > > > > > >> > > > > >> Piotrek > > > > > > >> > > > > >> > > > > > > >> > > > > >> wt., 13 gru 2022 o 08:57 Shammon FY < > > zjur...@gmail.com > > > > > > > > > > >> > napisał(a): > > > > > > >> > > > > >> > > > > > > >> > > > > >>> Hi David, > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> Thanks for the comments from you and @Piotr. I'd > > like > > > to > > > > > > >> explain > > > > > > >> > > the > > > > > > >> > > > > >>> details about the FLIP first. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 1) Global Checkpoint Commit: a) "rolling fashion" > or > > > b) > > > > > > >> > altogether > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> This mainly depends on the needs of users. Users > can > > > > > decide > > > > > > >> the > > > > > > >> > > data > > > > > > >> > > > > >>> version of tables in their queries according to > > > > different > > > > > > >> > > > requirements > > > > > > >> > > > > >>> for > > > > > > >> > > > > >>> data consistency and freshness. Since we manage > > > multiple > > > > > > >> versions > > > > > > >> > > for > > > > > > >> > > > > >>> each > > > > > > >> > > > > >>> table, this will not bring too much complexity to > > the > > > > > system. > > > > > > >> We > > > > > > >> > > only > > > > > > >> > > > > >>> need > > > > > > >> > > > > >>> to support different strategies when calculating > > table > > > > > > >> versions > > > > > > >> > for > > > > > > >> > > > > >>> query. > > > > > > >> > > > > >>> So we give this decision to users, who can use > > > > > > >> "consistency.type" > > > > > > >> > > to > > > > > > >> > > > > set > > > > > > >> > > > > >>> different consistency in "Catalog". We can > continue > > to > > > > > refine > > > > > > >> > this > > > > > > >> > > > > later. > > > > > > >> > > > > >>> For example, dynamic parameters support different > > > > > consistency > > > > > > >> > > > > >>> requirements > > > > > > >> > > > > >>> for each query > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 2) MetaService module > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> Many Flink streaming jobs use application mode, > and > > > they > > > > > are > > > > > > >> > > > > independent > > > > > > >> > > > > >>> of > > > > > > >> > > > > >>> each other. So we currently assume that > MetaService > > is > > > > an > > > > > > >> > > independent > > > > > > >> > > > > >>> node. > > > > > > >> > > > > >>> In the first phase, it will be started in > > standalone, > > > > and > > > > > HA > > > > > > >> will > > > > > > >> > > be > > > > > > >> > > > > >>> supported later. This node will reuse many Flink > > > > modules, > > > > > > >> > including > > > > > > >> > > > > REST, > > > > > > >> > > > > >>> Gateway-RpcServer, etc. We hope that the core > > > functions > > > > of > > > > > > >> > > > MetaService > > > > > > >> > > > > >>> can > > > > > > >> > > > > >>> be developed as a component. When Flink > subsequently > > > > uses > > > > > a > > > > > > >> large > > > > > > >> > > > > session > > > > > > >> > > > > >>> cluster to support various computations, it can be > > > > > integrated > > > > > > >> > into > > > > > > >> > > > the > > > > > > >> > > > > >>> "ResourceManager" as a plug-in component. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> Besides above, I'd like to describe the Checkpoint > > and > > > > > > >> Watermark > > > > > > >> > > > > >>> mechanisms > > > > > > >> > > > > >>> in detail as follows. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 1) Checkpoint VS Watermark > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> As you mentioned, I think it's very correct that > > what > > > we > > > > > want > > > > > > >> in > > > > > > >> > > the > > > > > > >> > > > > >>> Checkpoint is to align streaming computation and > > data > > > > > > >> according > > > > > > >> > to > > > > > > >> > > > > >>> certain > > > > > > >> > > > > >>> semantics. Timestamp is a very ideal solution. To > > > > achieve > > > > > this > > > > > > >> > > goal, > > > > > > >> > > > we > > > > > > >> > > > > >>> can > > > > > > >> > > > > >>> think of the following functions that need to be > > > > > supported in > > > > > > >> the > > > > > > >> > > > > >>> Watermark > > > > > > >> > > > > >>> mechanism: > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 1. Stateful Computation is aligned according to > > > > Timestamp > > > > > > >> Barrier > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> As the "three tables example" we discussed above, > we > > > > need > > > > > to > > > > > > >> > align > > > > > > >> > > > the > > > > > > >> > > > > >>> stateful operator computation according to the > > barrier > > > > to > > > > > > >> ensure > > > > > > >> > > the > > > > > > >> > > > > >>> consistency of the result data. In order to align > > the > > > > > > >> > computation, > > > > > > >> > > > > there > > > > > > >> > > > > >>> are two ways in my mind > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> a) Similar to the Aligned Checkpoint Barrier. > > > Timestamp > > > > > > >> Barrier > > > > > > >> > > > aligns > > > > > > >> > > > > >>> data > > > > > > >> > > > > >>> according to the channel, which will lead to > > > > backpressure > > > > > just > > > > > > >> > like > > > > > > >> > > > the > > > > > > >> > > > > >>> aligned checkpoint. It seems not a good idea. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> b) Similar to the window operator, align data in > > > memory > > > > > > >> according > > > > > > >> > > to > > > > > > >> > > > > >>> Timestamp. Two steps need to be supported here: > > first, > > > > > data is > > > > > > >> > > > aligned > > > > > > >> > > > > by > > > > > > >> > > > > >>> timestamp for state operators; secondly, Timestamp > > is > > > > > strictly > > > > > > >> > > > > >>> sequential, > > > > > > >> > > > > >>> global aggregation operators need to perform > > > aggregation > > > > > in > > > > > > >> > > timestamp > > > > > > >> > > > > >>> order > > > > > > >> > > > > >>> and output the final results. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 2. Coordinate multiple source nodes to assign > > unified > > > > > > >> Timestamp > > > > > > >> > > > > Barriers > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> Since the stateful operator needs to be aligned > > > > according > > > > > to > > > > > > >> the > > > > > > >> > > > > >>> Timestamp > > > > > > >> > > > > >>> Barrier, source subtasks of multiple jobs should > > > > generate > > > > > the > > > > > > >> > same > > > > > > >> > > > > >>> Timestamp Barrier. ETL jobs consuming RootTable > > should > > > > > > >> interact > > > > > > >> > > with > > > > > > >> > > > > >>> "MetaService" to generate the same Timestamp T1, > T2, > > > T3 > > > > > ... > > > > > > >> and > > > > > > >> > so > > > > > > >> > > > on. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 3. JobManager needs to manage the completed > > Timestamp > > > > > Barrier > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> When the Timestamp Barrier of the ETL job has been > > > > > completed, > > > > > > >> it > > > > > > >> > > > means > > > > > > >> > > > > >>> that > > > > > > >> > > > > >>> the data of the specified Timestamp can be queried > > by > > > > > users. > > > > > > >> > > > JobManager > > > > > > >> > > > > >>> needs to summarize its Timestamp processing and > > report > > > > the > > > > > > >> > > completed > > > > > > >> > > > > >>> Timestamp and data snapshots to the MetaServer. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 4. Failover supports Timestamp fine-grained data > > > > recovery > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> As we mentioned in the FLIP, each ETL is a complex > > > > single > > > > > > >> node. A > > > > > > >> > > > > single > > > > > > >> > > > > >>> ETL job failover should not cause the failure of > the > > > > > entire > > > > > > >> "ETL > > > > > > >> > > > > >>> Topology". > > > > > > >> > > > > >>> This requires that the result data of Timestamp > > > > generated > > > > > by > > > > > > >> > > upstream > > > > > > >> > > > > ETL > > > > > > >> > > > > >>> should be deterministic. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> a) The determinacy of Timestamp, that is, before > and > > > > > after ETL > > > > > > >> > job > > > > > > >> > > > > >>> failover, the same Timestamp sequence must be > > > generated. > > > > > Each > > > > > > >> > > > > Checkpoint > > > > > > >> > > > > >>> needs to record the included Timestamp list, > > > especially > > > > > the > > > > > > >> > source > > > > > > >> > > > node > > > > > > >> > > > > >>> of > > > > > > >> > > > > >>> the RootTable. After Failover, it needs to > > regenerate > > > > > > >> Timestamp > > > > > > >> > > > > according > > > > > > >> > > > > >>> to the Timestamp list. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> b) The determinacy of Timestamp data, that is, the > > > same > > > > > > >> Timestamp > > > > > > >> > > > needs > > > > > > >> > > > > >>> to > > > > > > >> > > > > >>> replay the same data before and after Failover, > and > > > > > generate > > > > > > >> the > > > > > > >> > > same > > > > > > >> > > > > >>> results in Sink Table. Each Timestamp must save > > start > > > > and > > > > > end > > > > > > >> > > offsets > > > > > > >> > > > > (or > > > > > > >> > > > > >>> snapshot id) of RootTable. After failover, the > > source > > > > > nodes > > > > > > >> need > > > > > > >> > to > > > > > > >> > > > > >>> replay > > > > > > >> > > > > >>> the data according to the offset to ensure that > the > > > data > > > > > of > > > > > > >> each > > > > > > >> > > > > >>> Timestamp > > > > > > >> > > > > >>> is consistent before and after Failover. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> For the specific requirements and complexity, > please > > > > help > > > > > to > > > > > > >> > review > > > > > > >> > > > > when > > > > > > >> > > > > >>> you are free @David @Piotr, thanks :) > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> 2) Evolution from Checkpoint to Timestamp > Mechanism > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> You give a very important question in your reply > > > which I > > > > > > >> missed > > > > > > >> > > > before: > > > > > > >> > > > > >>> if > > > > > > >> > > > > >>> Aligned Checkpoint is used in the first stage, how > > > > > complex is > > > > > > >> the > > > > > > >> > > > > >>> evolution > > > > > > >> > > > > >>> from Checkpoint to Timestamp later? I made a > general > > > > > > >> comparison > > > > > > >> > > here, > > > > > > >> > > > > >>> which > > > > > > >> > > > > >>> may not be very detailed. There are three roles in > > the > > > > > whole > > > > > > >> > > system: > > > > > > >> > > > > >>> MetaService, Flink ETL Job and Table Store. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> a) MetaService > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> It manages the data consistency among multiple ETL > > > jobs, > > > > > > >> > including > > > > > > >> > > > > >>> coordinating the Barrier for the Source ETL nodes, > > > > > setting the > > > > > > >> > > > starting > > > > > > >> > > > > >>> Barrier for ETL job startup, and calculating the > > Table > > > > > version > > > > > > >> > for > > > > > > >> > > > > >>> queries > > > > > > >> > > > > >>> according to different strategies. It has little > to > > do > > > > > with > > > > > > >> > > > Checkpoint > > > > > > >> > > > > in > > > > > > >> > > > > >>> fact, we can pay attention to it when designing > the > > > API > > > > > and > > > > > > >> > > > > implementing > > > > > > >> > > > > >>> the functions. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> b) Flink ETL Job > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> At present, the workload is relatively small and > we > > > need > > > > > to > > > > > > >> > trigger > > > > > > >> > > > > >>> checkpoints in CheckpointCoordinator manually by > > > > > > >> SplitEnumerator. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> c) Table Store > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> Table Store mainly provides the ability to write > and > > > > read > > > > > > >> data. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> c.1) Write data. At present, Table Store generates > > > > > snapshots > > > > > > >> > > > according > > > > > > >> > > > > to > > > > > > >> > > > > >>> two phases in Flink. When using Checkpoint as > > > > consistency > > > > > > >> > > management, > > > > > > >> > > > > we > > > > > > >> > > > > >>> need to write checkpoint information to snapshots. > > > After > > > > > using > > > > > > >> > > > > Timestamp > > > > > > >> > > > > >>> Barrier, the snapshot in Table Store may be > > > disassembled > > > > > more > > > > > > >> > > finely, > > > > > > >> > > > > and > > > > > > >> > > > > >>> we need to write Timestamp information to the data > > > > file. A > > > > > > >> > > > > "checkpointed > > > > > > >> > > > > >>> snapshot" may contain multiple "Timestamp > > snapshots". > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> c.2) Read data. The SplitEnumerator that reads > data > > > from > > > > > the > > > > > > >> > Table > > > > > > >> > > > > Store > > > > > > >> > > > > >>> will manage multiple splits according to the > version > > > > > number. > > > > > > >> > After > > > > > > >> > > > the > > > > > > >> > > > > >>> specified splits are completed, it sends a Barrier > > > > > command to > > > > > > >> > > > trigger a > > > > > > >> > > > > >>> checkpoint in the ETL job. The source node will > > > > broadcast > > > > > the > > > > > > >> > > > > checkpoint > > > > > > >> > > > > >>> barrier downstream after receiving it. When using > > > > > Timestamp > > > > > > >> > > Barrier, > > > > > > >> > > > > the > > > > > > >> > > > > >>> overall process is similar, but the > SplitEnumerator > > > does > > > > > not > > > > > > >> need > > > > > > >> > > to > > > > > > >> > > > > >>> trigger a checkpoint to the Flink ETL, and the > > Source > > > > node > > > > > > >> needs > > > > > > >> > to > > > > > > >> > > > > >>> support > > > > > > >> > > > > >>> broadcasting Timestamp Barrier to the downstream > at > > > that > > > > > time. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> From the above overall, the evolution complexity > > from > > > > > > >> Checkpoint > > > > > > >> > to > > > > > > >> > > > > >>> Timestamp seems controllable, but the specific > > > > > implementation > > > > > > >> > needs > > > > > > >> > > > > >>> careful > > > > > > >> > > > > >>> design, and the concept and features of Checkpoint > > > > should > > > > > not > > > > > > >> be > > > > > > >> > > > > >>> introduced > > > > > > >> > > > > >>> too much into relevant interfaces and functions. > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> What do you think of it? Looking forward to your > > > > feedback, > > > > > > >> thanks > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> Best, > > > > > > >> > > > > >>> Shammon > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> On Mon, Dec 12, 2022 at 11:46 PM David Morávek < > > > > > > >> d...@apache.org> > > > > > > >> > > > > wrote: > > > > > > >> > > > > >>> > > > > > > >> > > > > >>> > Hi Shammon, > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > I'm starting to see what you're trying to > achieve, > > > and > > > > > it's > > > > > > >> > > really > > > > > > >> > > > > >>> > exciting. I share Piotr's concerns about e2e > > latency > > > > and > > > > > > >> > > disability > > > > > > >> > > > > to > > > > > > >> > > > > >>> use > > > > > > >> > > > > >>> > unaligned checkpoints. > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > I have a couple of questions that are not clear > to > > > me > > > > > from > > > > > > >> > going > > > > > > >> > > > over > > > > > > >> > > > > >>> the > > > > > > >> > > > > >>> > FLIP: > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > 1) Global Checkpoint Commit > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > Are you planning on committing the checkpoints > in > > > a) a > > > > > > >> "rolling > > > > > > >> > > > > >>> fashion" - > > > > > > >> > > > > >>> > one pipeline after another, or b) altogether - > > once > > > > the > > > > > data > > > > > > >> > have > > > > > > >> > > > > been > > > > > > >> > > > > >>> > processed by all pipelines? > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > Option a) would be eventually consistent (for > > batch > > > > > queries, > > > > > > >> > > you'd > > > > > > >> > > > > >>> need to > > > > > > >> > > > > >>> > use the last checkpoint produced by the most > > > > downstream > > > > > > >> table), > > > > > > >> > > > > >>> whereas b) > > > > > > >> > > > > >>> > would be strongly consistent at the cost of > > > increasing > > > > > the > > > > > > >> e2e > > > > > > >> > > > > latency > > > > > > >> > > > > >>> even > > > > > > >> > > > > >>> > more. > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > I feel that option a) is what this should be > > headed > > > > for. > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > 2) MetaService > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > Should this be a new general Flink component or > > one > > > > > > >> specific to > > > > > > >> > > the > > > > > > >> > > > > >>> Flink > > > > > > >> > > > > >>> > Table Store? > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > 3) Follow-ups > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > From the above discussion, there is a consensus > > > that, > > > > > in the > > > > > > >> > > ideal > > > > > > >> > > > > >>> case, > > > > > > >> > > > > >>> > watermarks would be a way to go, but there is > some > > > > > > >> underlying > > > > > > >> > > > > mechanism > > > > > > >> > > > > >>> > missing. It would be great to discuss this > option > > in > > > > > more > > > > > > >> > detail > > > > > > >> > > to > > > > > > >> > > > > >>> compare > > > > > > >> > > > > >>> > the solutions in terms of implementation cost, > > maybe > > > > it > > > > > > >> could > > > > > > >> > not > > > > > > >> > > > be > > > > > > >> > > > > as > > > > > > >> > > > > >>> > complex. > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > All in all, I don't feel that checkpoints are > > > suitable > > > > > for > > > > > > >> > > > providing > > > > > > >> > > > > >>> > consistent table versioning between multiple > > > > pipelines. > > > > > The > > > > > > >> > main > > > > > > >> > > > > >>> reason is > > > > > > >> > > > > >>> > that they are designed to be a fault tolerance > > > > > mechanism. > > > > > > >> > > Somewhere > > > > > > >> > > > > >>> between > > > > > > >> > > > > >>> > the lines, you've already noted that the > primitive > > > > > you're > > > > > > >> > looking > > > > > > >> > > > for > > > > > > >> > > > > >>> is > > > > > > >> > > > > >>> > cross-pipeline barrier alignment, which is the > > > > > mechanism a > > > > > > >> > subset > > > > > > >> > > > of > > > > > > >> > > > > >>> > currently supported checkpointing > implementations > > > > > happen to > > > > > > >> be > > > > > > >> > > > using. > > > > > > >> > > > > >>> Is > > > > > > >> > > > > >>> > that correct? > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > My biggest concern is that tying this with a > > > > > "side-effect" > > > > > > >> of > > > > > > >> > the > > > > > > >> > > > > >>> > checkpointing mechanism could block us from > > evolving > > > > it > > > > > > >> > further. > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > Best, > > > > > > >> > > > > >>> > D. > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > On Mon, Dec 12, 2022 at 6:11 AM Shammon FY < > > > > > > >> zjur...@gmail.com> > > > > > > >> > > > > wrote: > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > > Hi Piotr, > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Thank you for your feedback. I cannot see the > > DAG > > > in > > > > > 3.a > > > > > > >> in > > > > > > >> > > your > > > > > > >> > > > > >>> reply, > > > > > > >> > > > > >>> > but > > > > > > >> > > > > >>> > > I'd like to answer some questions first. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Your understanding is very correct. We want to > > > align > > > > > the > > > > > > >> data > > > > > > >> > > > > >>> versions of > > > > > > >> > > > > >>> > > all intermediate tables through checkpoint > > > mechanism > > > > > in > > > > > > >> > Flink. > > > > > > >> > > > I'm > > > > > > >> > > > > >>> sorry > > > > > > >> > > > > >>> > > that I have omitted some default constraints > in > > > > FLIP, > > > > > > >> > including > > > > > > >> > > > > only > > > > > > >> > > > > >>> > > supporting aligned checkpoints; one table can > > only > > > > be > > > > > > >> written > > > > > > >> > > by > > > > > > >> > > > > one > > > > > > >> > > > > >>> ETL > > > > > > >> > > > > >>> > > job. I will add these later. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Why can't the watermark mechanism achieve the > > data > > > > > > >> > consistency > > > > > > >> > > we > > > > > > >> > > > > >>> wanted? > > > > > > >> > > > > >>> > > For example, there are 3 tables, Table1 is > word > > > > table, > > > > > > >> Table2 > > > > > > >> > > is > > > > > > >> > > > > >>> > word->cnt > > > > > > >> > > > > >>> > > table and Table3 is cnt1->cnt2 table. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 1. ETL1 from Table1 to Table2: INSERT INTO > > Table2 > > > > > SELECT > > > > > > >> > word, > > > > > > >> > > > > >>> count(*) > > > > > > >> > > > > >>> > > FROM Table1 GROUP BY word > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 2. ETL2 from Table2 to Table3: INSERT INTO > > Table3 > > > > > SELECT > > > > > > >> cnt, > > > > > > >> > > > > >>> count(*) > > > > > > >> > > > > >>> > FROM > > > > > > >> > > > > >>> > > Table2 GROUP BY cnt > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > ETL1 has 2 subtasks to read multiple buckets > > from > > > > > Table1, > > > > > > >> > where > > > > > > >> > > > > >>> subtask1 > > > > > > >> > > > > >>> > > reads streaming data as [a, b, c, a, d, a, b, > > c, d > > > > > ...] > > > > > > >> and > > > > > > >> > > > > subtask2 > > > > > > >> > > > > >>> > reads > > > > > > >> > > > > >>> > > streaming data as [a, c, d, q, a, v, c, d > ...]. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 1. Unbounded streaming data is divided into > > > multiple > > > > > sets > > > > > > >> > > > according > > > > > > >> > > > > >>> to > > > > > > >> > > > > >>> > some > > > > > > >> > > > > >>> > > semantic requirements. The most extreme may be > > one > > > > > set for > > > > > > >> > each > > > > > > >> > > > > data. > > > > > > >> > > > > >>> > > Assume that the sets of subtask1 and subtask2 > > > > > separated by > > > > > > >> > the > > > > > > >> > > > same > > > > > > >> > > > > >>> > > semantics are [a, b, c, a, d] and [a, c, d, > q], > > > > > > >> respectively. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 2. After the above two sets are computed by > > ETL1, > > > > the > > > > > > >> result > > > > > > >> > > data > > > > > > >> > > > > >>> > generated > > > > > > >> > > > > >>> > > in Table 2 is [(a, 3), (b, 1), (c, 1), (d, 2), > > (q, > > > > > 1)]. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 3. The result data generated in Table 3 after > > the > > > > > data in > > > > > > >> > > Table 2 > > > > > > >> > > > > is > > > > > > >> > > > > >>> > > computed by ETL2 is [(1, 3), (2, 1), (3, 1)] > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > We want to align the data of Table1, Table2 > and > > > > > Table3 and > > > > > > >> > > manage > > > > > > >> > > > > the > > > > > > >> > > > > >>> > data > > > > > > >> > > > > >>> > > versions. When users execute OLAP/Batch > queries > > > join > > > > > on > > > > > > >> these > > > > > > >> > > > > >>> tables, the > > > > > > >> > > > > >>> > > following consistency data can be found > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 1. Table1: [a, b, c, a, d] and [a, c, d, q] > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 2. Table2: [a, 3], [b, 1], [c, 1], [d, 2], [q, > > 1] > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 3. Table3: [1, 3], [2, 1], [3, 1] > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Users can perform query: SELECT t1.word, > t2.cnt, > > > > > t3.cnt2 > > > > > > >> from > > > > > > >> > > > > Table1 > > > > > > >> > > > > >>> t1 > > > > > > >> > > > > >>> > > JOIN Table2 t2 JOIN Table3 t3 on > t1.word=t2.word > > > and > > > > > > >> > > > > t2.cnt=t3.cnt1; > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > In the view of users, the data is consistent > on > > a > > > > > unified > > > > > > >> > > > "version" > > > > > > >> > > > > >>> > between > > > > > > >> > > > > >>> > > Table1, Table2 and Table3. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > In the current Flink implementation, the > aligned > > > > > > >> checkpoint > > > > > > >> > can > > > > > > >> > > > > >>> achieve > > > > > > >> > > > > >>> > the > > > > > > >> > > > > >>> > > above capabilities (let's ignore the > > segmentation > > > > > > >> semantics > > > > > > >> > of > > > > > > >> > > > > >>> checkpoint > > > > > > >> > > > > >>> > > first). Because the Checkpoint Barrier will > > align > > > > the > > > > > data > > > > > > >> > when > > > > > > >> > > > > >>> > performing > > > > > > >> > > > > >>> > > the global Count aggregation, we can associate > > the > > > > > > >> snapshot > > > > > > >> > > with > > > > > > >> > > > > the > > > > > > >> > > > > >>> > > checkpoint in the Table Store, query the > > specified > > > > > > >> snapshot > > > > > > >> > of > > > > > > >> > > > > >>> > > Table1/Table2/Table3 through the checkpoint, > and > > > > > achieve > > > > > > >> the > > > > > > >> > > > > >>> consistency > > > > > > >> > > > > >>> > > requirements of the above unified "version". > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Current watermark mechanism in Flink cannot > > > achieve > > > > > the > > > > > > >> above > > > > > > >> > > > > >>> > consistency. > > > > > > >> > > > > >>> > > For example, we use watermark to divide data > > into > > > > > multiple > > > > > > >> > sets > > > > > > >> > > > in > > > > > > >> > > > > >>> > subtask1 > > > > > > >> > > > > >>> > > and subtask2 as followed > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 1. subtask1:[(a, T1), (b, T1), (c, T1), (a, > T1), > > > (d, > > > > > T1)], > > > > > > >> > T1, > > > > > > >> > > > [(a, > > > > > > >> > > > > >>> T2), > > > > > > >> > > > > >>> > > (b, T2), (c, T2), (d, T2)], T2 > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > 2. subtask2: [(a, T1), (c, T1), (d, T1), (q, > > T1)], > > > > T1, > > > > > > >> .... > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > As Flink watermark does not have barriers and > > > cannot > > > > > align > > > > > > >> > > data, > > > > > > >> > > > > ETL1 > > > > > > >> > > > > >>> > Count > > > > > > >> > > > > >>> > > operator may compute the data of subtask1 > first: > > > > [(a, > > > > > T1), > > > > > > >> > (b, > > > > > > >> > > > T1), > > > > > > >> > > > > >>> (c, > > > > > > >> > > > > >>> > > T1), (a, T1), (d, T1)], T1, [(a, T2), (b, > T2)], > > > then > > > > > > >> compute > > > > > > >> > > the > > > > > > >> > > > > >>> data of > > > > > > >> > > > > >>> > > subtask2: [(a, T1), (c, T1), (d, T1), (q, > T1)], > > > T1, > > > > > which > > > > > > >> is > > > > > > >> > > not > > > > > > >> > > > > >>> possible > > > > > > >> > > > > >>> > > in aligned checkpoint. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > In this order, the result output to Table2 > after > > > the > > > > > Count > > > > > > >> > > > > >>> aggregation > > > > > > >> > > > > >>> > will > > > > > > >> > > > > >>> > > be: (a, 1, T1), (b, 1, T1), (c, 1, T1), (a, 2, > > > T1), > > > > > (a, 3, > > > > > > >> > T2), > > > > > > >> > > > (b, > > > > > > >> > > > > >>> 2, > > > > > > >> > > > > >>> > T2), > > > > > > >> > > > > >>> > > (a, 4, T1), (c, 2, T1), (d, 1, T1), (q, 1, > T1), > > > > which > > > > > can > > > > > > >> be > > > > > > >> > > > > >>> simplified > > > > > > >> > > > > >>> > as: > > > > > > >> > > > > >>> > > [(b, 1, T1), (a, 3, T2), (b, 2, T2), (a, 4, > T1), > > > (c, > > > > > 2, > > > > > > >> T1), > > > > > > >> > > (d, > > > > > > >> > > > 1, > > > > > > >> > > > > >>> T1), > > > > > > >> > > > > >>> > > (q, 1, T1)] > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > There's no (a, 3, T1), we have been unable to > > > query > > > > > > >> > consistent > > > > > > >> > > > data > > > > > > >> > > > > >>> > results > > > > > > >> > > > > >>> > > on Table1 and Table2 according to T1. Table 3 > > has > > > > the > > > > > same > > > > > > >> > > > problem. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > In addition to using Checkpoint Barrier, the > > other > > > > > > >> > > implementation > > > > > > >> > > > > >>> > > supporting watermark above is to convert Count > > > > > aggregation > > > > > > >> > into > > > > > > >> > > > > >>> Window > > > > > > >> > > > > >>> > > Count. After the global Count is converted > into > > > > window > > > > > > >> > > operator, > > > > > > >> > > > it > > > > > > >> > > > > >>> needs > > > > > > >> > > > > >>> > > to support cross window data computation. > > Similar > > > to > > > > > the > > > > > > >> data > > > > > > >> > > > > >>> > relationship > > > > > > >> > > > > >>> > > between the previous and the current > Checkpoint, > > > it > > > > is > > > > > > >> > > equivalent > > > > > > >> > > > > to > > > > > > >> > > > > >>> > > introducing the Watermark Barrier, which > > requires > > > > > > >> adjustments > > > > > > >> > > to > > > > > > >> > > > > the > > > > > > >> > > > > >>> > > current Flink Watermark mechanism. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Besides the above global aggregation, there > are > > > > window > > > > > > >> > > operators > > > > > > >> > > > in > > > > > > >> > > > > >>> > Flink. > > > > > > >> > > > > >>> > > I don't know if my understanding is correct(I > > > cannot > > > > > see > > > > > > >> the > > > > > > >> > > DAG > > > > > > >> > > > in > > > > > > >> > > > > >>> your > > > > > > >> > > > > >>> > > example), please correct me if it's wrong. I > > think > > > > you > > > > > > >> raise > > > > > > >> > a > > > > > > >> > > > very > > > > > > >> > > > > >>> > > important and interesting question: how to > > define > > > > data > > > > > > >> > > > consistency > > > > > > >> > > > > in > > > > > > >> > > > > >>> > > different window computations which will > > generate > > > > > > >> different > > > > > > >> > > > > >>> timestamps of > > > > > > >> > > > > >>> > > the same data. This situation also occurs when > > > using > > > > > event > > > > > > >> > time > > > > > > >> > > > to > > > > > > >> > > > > >>> align > > > > > > >> > > > > >>> > > data. At present, what I can think of is to > > store > > > > > these > > > > > > >> > > > information > > > > > > >> > > > > >>> in > > > > > > >> > > > > >>> > > Table Store, users can perform filter or join > on > > > > data > > > > > with > > > > > > >> > > them. > > > > > > >> > > > > This > > > > > > >> > > > > >>> > FLIP > > > > > > >> > > > > >>> > > is our first phase, and the specific > > > implementation > > > > of > > > > > > >> this > > > > > > >> > > will > > > > > > >> > > > be > > > > > > >> > > > > >>> > > designed and considered in the next phase and > > > FLIP. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Although the Checkpoint Barrier can achieve > the > > > most > > > > > basic > > > > > > >> > > > > >>> consistency, > > > > > > >> > > > > >>> > as > > > > > > >> > > > > >>> > > you mentioned, using the Checkpoint mechanism > > will > > > > > cause > > > > > > >> many > > > > > > >> > > > > >>> problems, > > > > > > >> > > > > >>> > > including the increase of checkpoint time for > > > > multiple > > > > > > >> > cascade > > > > > > >> > > > > jobs, > > > > > > >> > > > > >>> the > > > > > > >> > > > > >>> > > increase of E2E data freshness time (several > > > minutes > > > > > or > > > > > > >> even > > > > > > >> > > > dozens > > > > > > >> > > > > >>> of > > > > > > >> > > > > >>> > > minutes), and the increase of the overall > system > > > > > > >> complexity. > > > > > > >> > At > > > > > > >> > > > the > > > > > > >> > > > > >>> same > > > > > > >> > > > > >>> > > time, the semantics of Checkpoint data > > > segmentation > > > > is > > > > > > >> > unclear. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > The current FLIP is the first phase of our > whole > > > > > proposal, > > > > > > >> > and > > > > > > >> > > > you > > > > > > >> > > > > >>> can > > > > > > >> > > > > >>> > find > > > > > > >> > > > > >>> > > the follow-up plan in our future worker. In > the > > > > first > > > > > > >> stage, > > > > > > >> > we > > > > > > >> > > > do > > > > > > >> > > > > >>> not > > > > > > >> > > > > >>> > want > > > > > > >> > > > > >>> > > to modify the Flink mechanism. We'd like to > > > realize > > > > > basic > > > > > > >> > > system > > > > > > >> > > > > >>> > functions > > > > > > >> > > > > >>> > > based on existing mechanisms in Flink, > including > > > the > > > > > > >> > > relationship > > > > > > >> > > > > >>> > > management of ETL and tables, and the basic > data > > > > > > >> consistency, > > > > > > >> > > so > > > > > > >> > > > we > > > > > > >> > > > > >>> > choose > > > > > > >> > > > > >>> > > Global Checkpoint in our FLIP. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > We agree with you very much that event time is > > > more > > > > > > >> suitable > > > > > > >> > > for > > > > > > >> > > > > data > > > > > > >> > > > > >>> > > consistency management. We'd like consider > this > > > > > matter in > > > > > > >> the > > > > > > >> > > > > second > > > > > > >> > > > > >>> or > > > > > > >> > > > > >>> > > third stage after the current FLIP. We hope to > > > > > improve the > > > > > > >> > > > > watermark > > > > > > >> > > > > >>> > > mechanism in Flink to support barriers. As you > > > > > mentioned > > > > > > >> in > > > > > > >> > > your > > > > > > >> > > > > >>> reply, > > > > > > >> > > > > >>> > we > > > > > > >> > > > > >>> > > can achieve data consistency based on > timestamp, > > > > while > > > > > > >> > > > maintaining > > > > > > >> > > > > >>> E2E > > > > > > >> > > > > >>> > data > > > > > > >> > > > > >>> > > freshness of seconds or even milliseconds for > > 10+ > > > > > cascaded > > > > > > >> > > jobs. > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > What do you think? Thanks > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > Best, > > > > > > >> > > > > >>> > > Shammon > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > On Fri, Dec 9, 2022 at 6:13 PM Piotr Nowojski > < > > > > > > >> > > > > pnowoj...@apache.org> > > > > > > >> > > > > >>> > > wrote: > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > Hi Shammon, > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > Do I understand it correctly, that you > > > effectively > > > > > want > > > > > > >> to > > > > > > >> > > > expand > > > > > > >> > > > > >>> the > > > > > > >> > > > > >>> > > > checkpoint alignment mechanism across many > > > > different > > > > > > >> jobs > > > > > > >> > and > > > > > > >> > > > > hand > > > > > > >> > > > > >>> over > > > > > > >> > > > > >>> > > > checkpoint barriers from upstream to > > downstream > > > > jobs > > > > > > >> using > > > > > > >> > > the > > > > > > >> > > > > >>> > > intermediate > > > > > > >> > > > > >>> > > > tables? > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > Re the watermarks for the "Rejected > > > > Alternatives". I > > > > > > >> don't > > > > > > >> > > > > >>> understand > > > > > > >> > > > > >>> > why > > > > > > >> > > > > >>> > > > this has been rejected. Could you elaborate > on > > > > this > > > > > > >> point? > > > > > > >> > > Here > > > > > > >> > > > > >>> are a > > > > > > >> > > > > >>> > > > couple of my thoughts on this matter, but > > please > > > > > > >> correct me > > > > > > >> > > if > > > > > > >> > > > > I'm > > > > > > >> > > > > >>> > wrong, > > > > > > >> > > > > >>> > > > as I haven't dived deeper into this topic. > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > > As shown above, there are 2 watermarks T1 > > and > > > > T2, > > > > > T1 < > > > > > > >> > T2. > > > > > > >> > > > > >>> > > > > The StreamTask reads data in order: > > > > > > >> > > > > >>> > > > V11,V12,V21,T1(channel1),V13,T1(channel2). > > > > > > >> > > > > >>> > > > > At this time, StreamTask will confirm that > > > > > watermark > > > > > > >> T1 > > > > > > >> > is > > > > > > >> > > > > >>> completed, > > > > > > >> > > > > >>> > > > but the data beyond > > > > > > >> > > > > >>> > > > > T1 has been processed(V13) and the results > > are > > > > > > >> written to > > > > > > >> > > the > > > > > > >> > > > > >>> sink > > > > > > >> > > > > >>> > > > table. > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > 1. I see the same "problem" with unaligned > > > > > checkpoints > > > > > > >> in > > > > > > >> > > your > > > > > > >> > > > > >>> current > > > > > > >> > > > > >>> > > > proposal. > > > > > > >> > > > > >>> > > > 2. I don't understand why this is a problem? > > > Just > > > > > store > > > > > > >> in > > > > > > >> > > the > > > > > > >> > > > > >>> "sink > > > > > > >> > > > > >>> > > > table" what's the watermark (T1), and > > downstream > > > > > jobs > > > > > > >> > should > > > > > > >> > > > > >>> process > > > > > > >> > > > > >>> > the > > > > > > >> > > > > >>> > > > data with that "watermark" anyway. Record > > "V13" > > > > > should > > > > > > >> be > > > > > > >> > > > treated > > > > > > >> > > > > >>> as > > > > > > >> > > > > >>> > > > "early" data. Downstream jobs if: > > > > > > >> > > > > >>> > > > a) they are streaming jobs, for example > they > > > > should > > > > > > >> > > aggregate > > > > > > >> > > > it > > > > > > >> > > > > >>> in > > > > > > >> > > > > >>> > > > windowed/temporal state, but they shouldn't > > > > produce > > > > > the > > > > > > >> > > result > > > > > > >> > > > > that > > > > > > >> > > > > >>> > > > contains it, as the watermark T2 was not yet > > > > > processed. > > > > > > >> Or > > > > > > >> > > they > > > > > > >> > > > > >>> would > > > > > > >> > > > > >>> > > just > > > > > > >> > > > > >>> > > > pass that record as "early" data. > > > > > > >> > > > > >>> > > > b) they are batch jobs, it looks to me like > > > batch > > > > > jobs > > > > > > >> > > > shouldn't > > > > > > >> > > > > >>> take > > > > > > >> > > > > >>> > > > "all available data", but only consider "all > > the > > > > > data > > > > > > >> until > > > > > > >> > > > some > > > > > > >> > > > > >>> > > > watermark", for example the latest > available: > > T1 > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > 3. I'm pretty sure there are counter > examples, > > > > where > > > > > > >> your > > > > > > >> > > > > proposed > > > > > > >> > > > > >>> > > > mechanism of using checkpoints (even > aligned!) > > > > will > > > > > > >> produce > > > > > > >> > > > > >>> > > > inconsistent data from the perspective of > the > > > > event > > > > > > >> time. > > > > > > >> > > > > >>> > > > a) For example what if one of your "ETL" > > jobs, > > > > > has the > > > > > > >> > > > > following > > > > > > >> > > > > >>> DAG: > > > > > > >> > > > > >>> > > > [image: flip276.jpg] > > > > > > >> > > > > >>> > > > Even if you use aligned checkpoints for > > > > > committing the > > > > > > >> > data > > > > > > >> > > > to > > > > > > >> > > > > >>> the > > > > > > >> > > > > >>> > sink > > > > > > >> > > > > >>> > > > table, the watermarks of "Window1" and > > "Window2" > > > > are > > > > > > >> > > completely > > > > > > >> > > > > >>> > > > independent. The sink table might easily > have > > > data > > > > > from > > > > > > >> the > > > > > > >> > > > > >>> > Src1/Window1 > > > > > > >> > > > > >>> > > > from the event time T1 and Src2/Window2 from > > > later > > > > > event > > > > > > >> > time > > > > > > >> > > > T2. > > > > > > >> > > > > >>> > > > b) I think the same applies if you have > two > > > > > completely > > > > > > >> > > > > >>> independent > > > > > > >> > > > > >>> > ETL > > > > > > >> > > > > >>> > > > jobs writing either to the same sink table, > or > > > two > > > > > to > > > > > > >> > > different > > > > > > >> > > > > >>> sink > > > > > > >> > > > > >>> > > tables > > > > > > >> > > > > >>> > > > (that are both later used in the same > > downstream > > > > > job). > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > 4a) I'm not sure if I like the idea of > > > > centralising > > > > > the > > > > > > >> > whole > > > > > > >> > > > > >>> system in > > > > > > >> > > > > >>> > > > this way. If you have 10 jobs, the > likelihood > > of > > > > the > > > > > > >> > > checkpoint > > > > > > >> > > > > >>> failure > > > > > > >> > > > > >>> > > > will be 10 times higher, and/or the duration > > of > > > > the > > > > > > >> > > checkpoint > > > > > > >> > > > > can > > > > > > >> > > > > >>> be > > > > > > >> > > > > >>> > > much > > > > > > >> > > > > >>> > > > much longer (especially under backpressure). > > And > > > > > this is > > > > > > >> > > > actually > > > > > > >> > > > > >>> > > already a > > > > > > >> > > > > >>> > > > limitation of Apache Flink (global > checkpoints > > > are > > > > > more > > > > > > >> > prone > > > > > > >> > > > to > > > > > > >> > > > > >>> fail > > > > > > >> > > > > >>> > the > > > > > > >> > > > > >>> > > > larger the scale), so I would be anxious > about > > > > > making it > > > > > > >> > > > > >>> potentially > > > > > > >> > > > > >>> > > even a > > > > > > >> > > > > >>> > > > larger issue. > > > > > > >> > > > > >>> > > > 4b) I'm also worried about increased > > complexity > > > of > > > > > the > > > > > > >> > system > > > > > > >> > > > > after > > > > > > >> > > > > >>> > > adding > > > > > > >> > > > > >>> > > > the global checkpoint, and additional > > (single?) > > > > > point of > > > > > > >> > > > failure. > > > > > > >> > > > > >>> > > > 5. Such a design would also not work if we > > ever > > > > > wanted > > > > > > >> to > > > > > > >> > > have > > > > > > >> > > > > task > > > > > > >> > > > > >>> > local > > > > > > >> > > > > >>> > > > checkpoints. > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > All in all, it seems to me like actually the > > > > > watermarks > > > > > > >> and > > > > > > >> > > > even > > > > > > >> > > > > >>> time > > > > > > >> > > > > >>> > are > > > > > > >> > > > > >>> > > > the better concept in this context that > should > > > > have > > > > > been > > > > > > >> > used > > > > > > >> > > > for > > > > > > >> > > > > >>> > > > synchronising and data consistency across > the > > > > whole > > > > > > >> system. > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > Best, > > > > > > >> > > > > >>> > > > Piotrek > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > czw., 1 gru 2022 o 11:50 Shammon FY < > > > > > zjur...@gmail.com> > > > > > > >> > > > > >>> napisał(a): > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > >> Hi @Martijn > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> Thanks for your comments, and I'd like to > > reply > > > > to > > > > > them > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> 1. It sounds good to me, I'll update the > > > content > > > > > > >> structure > > > > > > >> > > in > > > > > > >> > > > > FLIP > > > > > > >> > > > > >>> > later > > > > > > >> > > > > >>> > > >> and give the problems first. > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> 2. "Each ETL job creates snapshots with > > > > checkpoint > > > > > > >> info on > > > > > > >> > > > sink > > > > > > >> > > > > >>> tables > > > > > > >> > > > > >>> > > in > > > > > > >> > > > > >>> > > >> Table Store" -> That reads like you're > > > proposing > > > > > that > > > > > > >> > > > snapshots > > > > > > >> > > > > >>> need > > > > > > >> > > > > >>> > to > > > > > > >> > > > > >>> > > >> be > > > > > > >> > > > > >>> > > >> written to Table Store? > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> Yes. To support the data consistency in the > > > FLIP, > > > > > we > > > > > > >> need > > > > > > >> > to > > > > > > >> > > > get > > > > > > >> > > > > >>> > through > > > > > > >> > > > > >>> > > >> checkpoints in Flink and snapshots in > store, > > > this > > > > > > >> > requires a > > > > > > >> > > > > close > > > > > > >> > > > > >>> > > >> combination of Flink and store > > implementation. > > > In > > > > > the > > > > > > >> > first > > > > > > >> > > > > stage > > > > > > >> > > > > >>> we > > > > > > >> > > > > >>> > > plan > > > > > > >> > > > > >>> > > >> to implement it based on Flink and Table > > Store > > > > > only, > > > > > > >> > > snapshots > > > > > > >> > > > > >>> written > > > > > > >> > > > > >>> > > to > > > > > > >> > > > > >>> > > >> external storage don't support consistency. > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> 3. If you introduce a MetaService, it > becomes > > > the > > > > > > >> single > > > > > > >> > > point > > > > > > >> > > > > of > > > > > > >> > > > > >>> > > failure > > > > > > >> > > > > >>> > > >> because it coordinates everything. But I > > can't > > > > find > > > > > > >> > anything > > > > > > >> > > > in > > > > > > >> > > > > >>> the > > > > > > >> > > > > >>> > FLIP > > > > > > >> > > > > >>> > > >> on > > > > > > >> > > > > >>> > > >> making the MetaService high available or > how > > to > > > > > deal > > > > > > >> with > > > > > > >> > > > > >>> failovers > > > > > > >> > > > > >>> > > there. > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> I think you raise a very important problem > > and > > > I > > > > > > >> missed it > > > > > > >> > > in > > > > > > >> > > > > >>> FLIP. > > > > > > >> > > > > >>> > The > > > > > > >> > > > > >>> > > >> MetaService is a single point and should > > > support > > > > > > >> failover, > > > > > > >> > > we > > > > > > >> > > > > >>> will do > > > > > > >> > > > > >>> > it > > > > > > >> > > > > >>> > > >> in > > > > > > >> > > > > >>> > > >> future in the first stage we only support > > > > > standalone > > > > > > >> mode, > > > > > > >> > > THX > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> 4. The FLIP states under Rejected > > Alternatives > > > > > > >> "Currently > > > > > > >> > > > > >>> watermark in > > > > > > >> > > > > >>> > > >> Flink cannot align data." which is not > true, > > > > given > > > > > that > > > > > > >> > > there > > > > > > >> > > > is > > > > > > >> > > > > >>> > > FLIP-182 > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> Watermark alignment in FLIP-182 is > different > > > from > > > > > > >> > > requirements > > > > > > >> > > > > >>> > > "watermark > > > > > > >> > > > > >>> > > >> align data" in our FLIP. FLIP-182 aims to > fix > > > > > watermark > > > > > > >> > > > > >>> generation in > > > > > > >> > > > > >>> > > >> different sources for "slight imbalance or > > data > > > > > skew", > > > > > > >> > which > > > > > > >> > > > > >>> means in > > > > > > >> > > > > >>> > > some > > > > > > >> > > > > >>> > > >> cases the source must generate watermark > even > > > if > > > > > they > > > > > > >> > should > > > > > > >> > > > > not. > > > > > > >> > > > > >>> When > > > > > > >> > > > > >>> > > the > > > > > > >> > > > > >>> > > >> operator collects watermarks, the data > > > processing > > > > > is as > > > > > > >> > > > > described > > > > > > >> > > > > >>> in > > > > > > >> > > > > >>> > our > > > > > > >> > > > > >>> > > >> FLIP, and the data cannot be aligned > through > > > the > > > > > > >> barrier > > > > > > >> > > like > > > > > > >> > > > > >>> > > Checkpoint. > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> 5. Given the MetaService role, it feels > like > > > this > > > > > is > > > > > > >> > > > > introducing a > > > > > > >> > > > > >>> > tight > > > > > > >> > > > > >>> > > >> dependency between Flink and the Table > Store. > > > How > > > > > > >> > pluggable > > > > > > >> > > is > > > > > > >> > > > > >>> this > > > > > > >> > > > > >>> > > >> solution, given the changes that need to be > > > made > > > > to > > > > > > >> Flink > > > > > > >> > in > > > > > > >> > > > > >>> order to > > > > > > >> > > > > >>> > > >> support this? > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> This is a good question, and I will try to > > > expand > > > > > it. > > > > > > >> Most > > > > > > >> > > of > > > > > > >> > > > > the > > > > > > >> > > > > >>> work > > > > > > >> > > > > >>> > > >> will > > > > > > >> > > > > >>> > > >> be completed in the Table Store, such as > the > > > new > > > > > > >> > > > SplitEnumerator > > > > > > >> > > > > >>> and > > > > > > >> > > > > >>> > > >> Source > > > > > > >> > > > > >>> > > >> implementation. The changes in Flink are as > > > > > followed: > > > > > > >> > > > > >>> > > >> 1) Flink job should put its job id in > context > > > > when > > > > > > >> > creating > > > > > > >> > > > > >>> > source/sink > > > > > > >> > > > > >>> > > to > > > > > > >> > > > > >>> > > >> help MetaService to create relationship > > between > > > > > source > > > > > > >> and > > > > > > >> > > > sink > > > > > > >> > > > > >>> > tables, > > > > > > >> > > > > >>> > > >> it's tiny > > > > > > >> > > > > >>> > > >> 2) Notify a listener when job is terminated > > in > > > > > Flink, > > > > > > >> and > > > > > > >> > > the > > > > > > >> > > > > >>> listener > > > > > > >> > > > > >>> > > >> implementation in Table Store will send > > "delete > > > > > event" > > > > > > >> to > > > > > > >> > > > > >>> MetaService. > > > > > > >> > > > > >>> > > >> 3) The changes are related to Flink > > Checkpoint > > > > > includes > > > > > > >> > > > > >>> > > >> a) Support triggering checkpoint with > > > > checkpoint > > > > > id > > > > > > >> by > > > > > > >> > > > > >>> > SplitEnumerator > > > > > > >> > > > > >>> > > >> b) Create the SplitEnumerator in Table > > Store > > > > > with a > > > > > > >> > > strategy > > > > > > >> > > > > to > > > > > > >> > > > > >>> > > perform > > > > > > >> > > > > >>> > > >> the specific checkpoint when all > > > > > "SplitEnumerator"s in > > > > > > >> the > > > > > > >> > > job > > > > > > >> > > > > >>> manager > > > > > > >> > > > > >>> > > >> trigger it. > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> Best, > > > > > > >> > > > > >>> > > >> Shammon > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> On Thu, Dec 1, 2022 at 3:43 PM Martijn > > Visser < > > > > > > >> > > > > >>> > martijnvis...@apache.org > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > >> wrote: > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > >> > Hi all, > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > A couple of first comments on this: > > > > > > >> > > > > >>> > > >> > 1. I'm missing the problem statement in > the > > > > > overall > > > > > > >> > > > > >>> introduction. It > > > > > > >> > > > > >>> > > >> > immediately goes into proposal mode, I > > would > > > > > like to > > > > > > >> > first > > > > > > >> > > > > read > > > > > > >> > > > > >>> what > > > > > > >> > > > > >>> > > is > > > > > > >> > > > > >>> > > >> the > > > > > > >> > > > > >>> > > >> > actual problem, before diving into > > solutions. > > > > > > >> > > > > >>> > > >> > 2. "Each ETL job creates snapshots with > > > > > checkpoint > > > > > > >> info > > > > > > >> > on > > > > > > >> > > > > sink > > > > > > >> > > > > >>> > tables > > > > > > >> > > > > >>> > > >> in > > > > > > >> > > > > >>> > > >> > Table Store" -> That reads like you're > > > > proposing > > > > > > >> that > > > > > > >> > > > > snapshots > > > > > > >> > > > > >>> > need > > > > > > >> > > > > >>> > > >> to be > > > > > > >> > > > > >>> > > >> > written to Table Store? > > > > > > >> > > > > >>> > > >> > 3. If you introduce a MetaService, it > > becomes > > > > the > > > > > > >> single > > > > > > >> > > > point > > > > > > >> > > > > >>> of > > > > > > >> > > > > >>> > > >> failure > > > > > > >> > > > > >>> > > >> > because it coordinates everything. But I > > > can't > > > > > find > > > > > > >> > > anything > > > > > > >> > > > > in > > > > > > >> > > > > >>> the > > > > > > >> > > > > >>> > > >> FLIP on > > > > > > >> > > > > >>> > > >> > making the MetaService high available or > > how > > > to > > > > > deal > > > > > > >> > with > > > > > > >> > > > > >>> failovers > > > > > > >> > > > > >>> > > >> there. > > > > > > >> > > > > >>> > > >> > 4. The FLIP states under Rejected > > > Alternatives > > > > > > >> > "Currently > > > > > > >> > > > > >>> watermark > > > > > > >> > > > > >>> > in > > > > > > >> > > > > >>> > > >> > Flink cannot align data." which is not > > true, > > > > > given > > > > > > >> that > > > > > > >> > > > there > > > > > > >> > > > > is > > > > > > >> > > > > >>> > > >> FLIP-182 > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > 5. Given the MetaService role, it feels > > like > > > > > this is > > > > > > >> > > > > >>> introducing a > > > > > > >> > > > > >>> > > tight > > > > > > >> > > > > >>> > > >> > dependency between Flink and the Table > > Store. > > > > How > > > > > > >> > > pluggable > > > > > > >> > > > is > > > > > > >> > > > > >>> this > > > > > > >> > > > > >>> > > >> > solution, given the changes that need to > be > > > > made > > > > > to > > > > > > >> > Flink > > > > > > >> > > in > > > > > > >> > > > > >>> order > > > > > > >> > > > > >>> > to > > > > > > >> > > > > >>> > > >> > support this? > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > Best regards, > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > Martijn > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > On Thu, Dec 1, 2022 at 4:49 AM Shammon > FY < > > > > > > >> > > > zjur...@gmail.com> > > > > > > >> > > > > >>> > wrote: > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > > Hi devs: > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > I'd like to start a discussion about > > > > FLIP-276: > > > > > Data > > > > > > >> > > > > >>> Consistency of > > > > > > >> > > > > >>> > > >> > > Streaming and Batch ETL in Flink and > > Table > > > > > > >> Store[1]. > > > > > > >> > In > > > > > > >> > > > the > > > > > > >> > > > > >>> whole > > > > > > >> > > > > >>> > > data > > > > > > >> > > > > >>> > > >> > > stream processing, there are > consistency > > > > > problems > > > > > > >> such > > > > > > >> > > as > > > > > > >> > > > > how > > > > > > >> > > > > >>> to > > > > > > >> > > > > >>> > > >> manage > > > > > > >> > > > > >>> > > >> > the > > > > > > >> > > > > >>> > > >> > > dependencies of multiple jobs and > tables, > > > how > > > > > to > > > > > > >> > define > > > > > > >> > > > and > > > > > > >> > > > > >>> handle > > > > > > >> > > > > >>> > > E2E > > > > > > >> > > > > >>> > > >> > > delays, and how to ensure the data > > > > consistency > > > > > of > > > > > > >> > > queries > > > > > > >> > > > on > > > > > > >> > > > > >>> > flowing > > > > > > >> > > > > >>> > > >> > data? > > > > > > >> > > > > >>> > > >> > > This FLIP aims to support data > > consistency > > > > and > > > > > > >> answer > > > > > > >> > > > these > > > > > > >> > > > > >>> > > questions. > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > I'v discussed the details of this FLIP > > with > > > > > > >> @Jingsong > > > > > > >> > > Lee > > > > > > >> > > > > and > > > > > > >> > > > > >>> > > >> @libenchao > > > > > > >> > > > > >>> > > >> > > offline several times. We hope to > support > > > > data > > > > > > >> > > consistency > > > > > > >> > > > > of > > > > > > >> > > > > >>> > > queries > > > > > > >> > > > > >>> > > >> on > > > > > > >> > > > > >>> > > >> > > tables, managing relationships between > > > Flink > > > > > jobs > > > > > > >> and > > > > > > >> > > > tables > > > > > > >> > > > > >>> and > > > > > > >> > > > > >>> > > >> revising > > > > > > >> > > > > >>> > > >> > > tables on streaming in Flink and Table > > > Store > > > > to > > > > > > >> > improve > > > > > > >> > > > the > > > > > > >> > > > > >>> whole > > > > > > >> > > > > >>> > > data > > > > > > >> > > > > >>> > > >> > > stream processing. > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > Looking forward to your feedback. > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > [1] > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-276%3A+Data+Consistency+of+Streaming+and+Batch+ETL+in+Flink+and+Table+Store > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > Best, > > > > > > >> > > > > >>> > > >> > > Shammon > > > > > > >> > > > > >>> > > >> > > > > > > > > >> > > > > >>> > > >> > > > > > > > >> > > > > >>> > > >> > > > > > > >> > > > > >>> > > > > > > > > > >> > > > > >>> > > > > > > > > >> > > > > >>> > > > > > > > >> > > > > >>> > > > > > > >> > > > > >> > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >