Hi Rex, yes you can go directly into Flink since 1.11.0 [1], but afaik only through Table API/SQL currently (which you seem to be using anyways most of the time). I'd recommend using 1.11.1+ (some bugfixes) or even 1.12.0+ (many new useful features [2]). You can also check the main doc [3].
If you like more background, Marta talked about it on a higher level [4] (slides [5]) and Qingsheng and Jark on a lower level as well [6]. [1] https://flink.apache.org/news/2020/07/06/release-1.11.0.html#table-apisql-support-for-change-data-capture-cdc [2] https://flink.apache.org/news/2020/12/10/release-1.12.0.html#table-apisql-support-for-temporal-table-joins-in-sql [3] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/debezium.html [4] https://www.youtube.com/watch?v=wRIQqgI1gLA [5] https://noti.st/morsapaes/liQzgs/change-data-capture-with-flink-sql-and-debezium [6] https://www.youtube.com/watch?v=5AThYUD4grA On Mon, Mar 1, 2021 at 8:53 PM Rex Fenley <r...@remind101.com> wrote: > Thanks Arvid, > > I think my confusion lies in misinterpreting the meaning of CDC. We > basically don't want CDC, we just use it to get data into a compacted Kafka > topic where we hold the current state of the world to consume from multiple > consumers. You have described pretty thoroughly where we want to go. > > One interesting part of your architecture is this "Debezium -> State > collecting Flink job". Is there a way for Debezium to write to Flink? I > thought it required Kafka Connect. > > Appreciate your feedback > > On Mon, Mar 1, 2021 at 12:43 AM Arvid Heise <ar...@apache.org> wrote: > >> > We are rereading the topics, at any time we might want a completely >> different materialized view for a different web service for some new >> application feature. Other jobs / new jobs need to read all the up-to-date >> rows from the databases. >> > I still don't see how this is the case if everything just needs to be >> overwritten by primary key. To re-emphasize, we do not care about >> historical data. >> Why are you reading from a CDC topic and not a log-compacted topic that >> reflects the state then? CDC is all about history and changes. >> >> What i'd imagine an architecture that would work better for you: >> >> For each SQL table (ingress layer): >> SQL Table -> Debezium -> State collecting Flink job -> Kafka state topic >> (compacted) >> >> Analytics (processing layer): >> Kafka state topics (compacted) -> Analytical Flink job -> Kafka state >> topic (compacted) >> >> For each view (egress layer): >> Kafka state topics (compacted) -> Aggregating Flink job -> K/V store(s) >> -> Web application >> >> The ingress layer is only there to provide you log-compacted Kafka >> topics. Then you can do a bunch of analytical queries from Kafka to Kafka. >> Finally, you output your views to K/V stores for high-avail web >> applications (=decoupled from processing layer). >> >> If that's what you already have, then my apology for not picking that up. >> It's really important to stress that no Kafka topics ever contain CDC data >> in this instance since you are not interested in historic data. The only >> CDC exchange is by using the debezium connector of Flink. At this point, >> all discussions of this thread are resolved. >> >> >> >> On Sat, Feb 27, 2021 at 9:06 PM Rex Fenley <r...@remind101.com> wrote: >> >>> Hi Arvid, >>> >>> >If you are not rereading the topics, why do you compact them? >>> We are rereading the topics, at any time we might want a completely >>> different materialized view for a different web service for some new >>> application feature. Other jobs / new jobs need to read all the up-to-date >>> rows from the databases. >>> >>> >correctness depends on compaction < downtime >>> I still don't see how this is the case if everything just needs to be >>> overwritten by primary key. To re-emphasize, we do not care about >>> historical data. >>> >>> >Again, a cloud-native key/value store would perform much better and be >>> much cheaper with better SLAs >>> Is there a cloud-native key/value store which can read from a Postgres >>> WAL or MySQL binlog and then keep an up-to-date read marker for any >>> materialization consumers downstream *besides* Kafka + Debezium? >>> >>> Appreciate all the feedback, though hopefully we can get closer to the >>> same mental model. If there's really a better alternative here I'm all for >>> it! >>> >>> >>> On Sat, Feb 27, 2021 at 11:50 AM Arvid Heise <ar...@apache.org> wrote: >>> >>>> Hi Rex, >>>> >>>> Your initial question was about the impact of compaction on your CDC >>>> application logic. I have been (unsuccessfully) trying to tell you that you >>>> do not need compaction and it's counterproductive. >>>> >>>> If you are not rereading the topics, why do you compact them? It's lost >>>> compute time and I/O on the Kafka brokers (which are both very valuable) >>>> and does not give you anything that an appropriate retention time wouldn't >>>> give you (=lower SSD usage). It makes the mental model more complicated. An >>>> aggressive compaction and a larger backlog (compaction time < application >>>> failure/restart/upgrade time) would lead to incorrect results (in the same >>>> way an inappropriate retention period may cause data loss for the same >>>> reason). >>>> >>>> The only use case for log compaction is if you're using a Kafka topic >>>> for a key/value store to serve a web application (in which case, it's >>>> usually better to take a real key/value store) but then you don't need >>>> retractions anymore but you'd simply overwrite the actual values or use >>>> tombstone records for deletions. >>>> >>>> If you consume the same topic both for web applications and Flink and >>>> don't want to use another technology for key/value store, then log >>>> compaction of retractions kinda makes sense to kill 2 birds with one stone. >>>> However, you have to live with the downsides on the Flink side (correctness >>>> depends on compaction < downtime) and on web application (deal with >>>> retractions even though they do not make any sense at that level). Again, a >>>> cloud-native key/value store would perform much better and be much cheaper >>>> with better SLAs and solve all issues on the Flink side (final note: it's >>>> independent of the technology, any stream processor will encounter the same >>>> issue as it's a conceptual mismatch). >>>> >>>> On Sat, Feb 27, 2021 at 8:24 PM Rex Fenley <r...@remind101.com> wrote: >>>> >>>>> Hi Arvid, >>>>> >>>>> I really appreciate the thorough response but I don't think this >>>>> contradicts our use case. In servicing web applications we're doing >>>>> nothing >>>>> more than taking data from giant databases we use, and performing joins >>>>> and >>>>> denormalizing aggs strictly for performance reasons (joining across a lot >>>>> of stuff on query time is slow) and putting specified results into another >>>>> database connected to the specified web server. Our Flink jobs are purely >>>>> used for up-to-date materialized views. We don't care about historical >>>>> analysis, we only care about what the exact current state of the world is. >>>>> >>>>> This is why every row has a primary key, from beginning to end of the >>>>> job (even though Flink's table api can't seem to detect that after a lot >>>>> of >>>>> joins in our plan, but it's logically true since then the join key will be >>>>> pk). This is also why all we need to do is retract the current row from >>>>> the >>>>> Kafka source on the existing primary key that's being overwritten, have >>>>> that retract propagate downstream to throw away any data transformed from >>>>> that row, and then process the new row. We don't care what other data >>>>> changes may have happened in between, it's not applicable to our use case. >>>>> >>>>> We're using CDC for nothing more than a way to get the latest rows in >>>>> real time into Kafka so they can be read by various Flink jobs we hope to >>>>> build (starting with the one we're currently working on that has ~35 >>>>> stateful operators) which then just transform and forward to another >>>>> database. >>>>> >>>>> ---- >>>>> >>>>> Reading the Upsert Kafka docs [1] "In the physical operator, we will >>>>> use state to know whether the key is the first time to be seen. The >>>>> operator will produce INSERT rows, or additionally generate UPDATE_BEFORE >>>>> rows for the previous image, or produce DELETE rows with all columns >>>>> filled >>>>> with values." This is how we thought the regular Kafka source actually >>>>> worked, that it had state on PKs it could retract on, because we weren't >>>>> even thinking of any other use case until it hit me that may not be true. >>>>> Luckily the doc also provides an example of simply forwarding from DBZ >>>>> Kafka to Upsert Kafka, even if DBZ Kafka source data is compacted it won't >>>>> matter since now everything in the actual job reading from Upsert Kafka >>>>> should function by PK like we need. On that note, I think it may be >>>>> helpful >>>>> to edit the documentation to indicate that if you need stateful PK based >>>>> Kafka consumption it must be via Upsert Kafka. >>>>> >>>>> [1] >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector >>>>> >>>>> Again, thanks for the thorough reply, this really helped my >>>>> understanding! >>>>> >>>>> On Sat, Feb 27, 2021 at 4:02 AM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Hi Rex, >>>>>> >>>>>> imho log compaction and CDC for historic processes are incompatible >>>>>> on conceptual level. Let's take this example: >>>>>> >>>>>> topic: party membership >>>>>> +(1, Dem, 2000) >>>>>> -(1, Dem, 2009) >>>>>> +(1, Gop, 2009) >>>>>> Where 1 is the id of a real person. >>>>>> >>>>>> Now, let's consider you want to count memberships retroactively each >>>>>> year. >>>>>> You'd get 2000-2009, 1 Dem and 0 GOP and 2009+ 1 GOP and 0 Dem. >>>>>> >>>>>> Now, consider you have log compaction with a compaction period <1 >>>>>> year. >>>>>> You'd get 2000-2009, 0 Dem and 0 GOP and only the real result for >>>>>> 2009+ (or in general the time at the latest change). >>>>>> >>>>>> Let's take another example: >>>>>> +(2, Dem, 2000) >>>>>> -(2, Dem, 2009) >>>>>> >>>>>> With log compaction, you'd get -1/0/-1 Dem and 0 GOP for 2009+ >>>>>> depending on how well your application can deal with incomplete logs. >>>>>> Let's >>>>>> say your application is simply adding and subtracting retractions, you'd >>>>>> get -1. If your application is ignoring deletions without insertions >>>>>> (needs >>>>>> to be tracked for each person), you'd get 0. If your application is not >>>>>> looking at the retraction type, you'd get 1. >>>>>> >>>>>> As you can see, you need to be really careful to craft your >>>>>> application correctly. The correct result will only be achieved through >>>>>> the >>>>>> most complex application (aggregating state for each person and dealing >>>>>> with incomplete information). This is completely independent of Kafka, >>>>>> Debezium, or Flink. >>>>>> >>>>>> --- >>>>>> >>>>>> However, as Jan pointed out: If you don't process data before >>>>>> compaction, then your application is correct. Now, then the question is >>>>>> what's the benefit of having data in the topic older than the compaction? >>>>>> The value is close to 0 as you can't really use it for CDC processing >>>>>> (again independent of Flink). >>>>>> >>>>>> Consequently, instead of compaction, I'd go with a lower retention >>>>>> policy and offload the data to s3 for historic (re)processing (afaik the >>>>>> cloud offering of confluent finally has automatic offloading but you can >>>>>> also build it yourself). Then you only need to ensure that your >>>>>> application >>>>>> is never accessing data that is deleted because of the retention time. In >>>>>> general, it's better to choose a technology such as Pulsar with tiered >>>>>> storage that gives you exactly what you want with low overhead: you need >>>>>> unlimited retention without compaction but without holding much data in >>>>>> expensive storage (SSD) by offloading automatically to cold storage. >>>>>> >>>>>> If this is not working for you, then please share your requirements >>>>>> with me why you'd need compaction + a different retention for >>>>>> source/intermediate topics. >>>>>> >>>>>> For the final topic, from my experience, a real key/value store works >>>>>> much better than log compacted topics for serving web applications. >>>>>> Confluent's marketing is strongly pushing that Kafka can be used as a >>>>>> database and as a key/value store while in reality, it's "just" a good >>>>>> distribution log. I can provide pointers that discuss the limitations if >>>>>> there is interest. Also note that the final topic should not be in CDC >>>>>> format anymore (so no retractions). It should just contain the current >>>>>> state. For both examples together it would be >>>>>> 1, Gop, 2009 >>>>>> and no record for person 2. >>>>>> >>>>>> >>>>>> On Sat, Feb 27, 2021 at 3:33 AM Rex Fenley <r...@remind101.com> wrote: >>>>>> >>>>>>> Digging around, it looks like Upsert Kafka which requires a Primary >>>>>>> Key will actually do what I want and uses compaction, but it doesn't >>>>>>> look >>>>>>> compatible with Debezium format? Is this on the roadmap? >>>>>>> >>>>>>> In the meantime, we're considering consuming from Debezium Kafka >>>>>>> (still compacted) and then writing directly to an Upsert Kafka sink and >>>>>>> then reading right back out of a corresponding Upsert Kafka source. >>>>>>> Since >>>>>>> that little roundabout will key all changes by primary key it should >>>>>>> give >>>>>>> us a compacted topic to start with initially. Once we get that working >>>>>>> we >>>>>>> can probably do the same thing with intermediate flink jobs too. >>>>>>> >>>>>>> Would appreciate any feedback on this approach, thanks! >>>>>>> >>>>>>> On Fri, Feb 26, 2021 at 10:52 AM Rex Fenley <r...@remind101.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Does this also imply that it's not safe to compact the initial >>>>>>>> topic where data is coming from Debezium? I'd think that Flink's Kafka >>>>>>>> source would emit retractions on any existing data with a primary key >>>>>>>> as >>>>>>>> new data with the same pk arrived (in our case all data has primary >>>>>>>> keys). >>>>>>>> I guess that goes back to my original question still however, is this >>>>>>>> not >>>>>>>> what the Kafka source does? Is there no way to make that happen? >>>>>>>> >>>>>>>> We really can't live with the record amplification, it's sometimes >>>>>>>> nonlinear and randomly kills RocksDB performance. >>>>>>>> >>>>>>>> On Fri, Feb 26, 2021 at 2:16 AM Arvid Heise <ar...@apache.org> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Just to clarify, intermediate topics should in most cases not be >>>>>>>>> compacted for exactly the reasons if your application depends on all >>>>>>>>> intermediate data. For the final topic, it makes sense. If you also >>>>>>>>> consume >>>>>>>>> intermediate topics for web application, one solution is to split it >>>>>>>>> into >>>>>>>>> two topics (like topic-raw for Flink and topic-compacted for >>>>>>>>> applications) >>>>>>>>> and live with some amplification. >>>>>>>>> >>>>>>>>> On Thu, Feb 25, 2021 at 12:11 AM Rex Fenley <r...@remind101.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> All of our Flink jobs are (currently) used for web applications >>>>>>>>>> at the end of the day. We see a lot of latency spikes from record >>>>>>>>>> amplification and we were at first hoping we could pass intermediate >>>>>>>>>> results through Kafka and compact them to lower the record >>>>>>>>>> amplification, >>>>>>>>>> but then it hit me that this might be an issue. >>>>>>>>>> >>>>>>>>>> Thanks for the detailed explanation, though it seems like we'll >>>>>>>>>> need to look for a different solution or only compact on records we >>>>>>>>>> know >>>>>>>>>> will never mutate. >>>>>>>>>> >>>>>>>>>> On Wed, Feb 24, 2021 at 6:38 AM Arvid Heise <ar...@apache.org> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Jan's response is correct, but I'd like to emphasize the impact >>>>>>>>>>> on a Flink application. >>>>>>>>>>> >>>>>>>>>>> If the compaction happens before the data arrives in Flink, the >>>>>>>>>>> intermediate updates are lost and just the final result appears. >>>>>>>>>>> Also if you restart your Flink application and reprocess older >>>>>>>>>>> data, it will naturally only see the compacted data save for the >>>>>>>>>>> active >>>>>>>>>>> segment. >>>>>>>>>>> >>>>>>>>>>> So how to make it deterministic? Simply drop topic compaction. >>>>>>>>>>> If it's coming from CDC and you want to process and produce >>>>>>>>>>> changelog >>>>>>>>>>> streams over several applications, you probably don't want to use >>>>>>>>>>> log >>>>>>>>>>> compactions anyways. >>>>>>>>>>> >>>>>>>>>>> Log compaction only makes sense in the snapshot topic that >>>>>>>>>>> displays the current state (KTable), where you don't think in CDC >>>>>>>>>>> updates >>>>>>>>>>> anymore but just final records, like >>>>>>>>>>> (user_id: 1, state: "california") >>>>>>>>>>> (user_id: 1, state: "ohio") >>>>>>>>>>> >>>>>>>>>>> Usually, if you use CDC in your company, each application is >>>>>>>>>>> responsible for building its own current model by tapping in the >>>>>>>>>>> relevant >>>>>>>>>>> changes. Log compacted topics would then only appear at the end of >>>>>>>>>>> processing, when you hand it over towards non-analytical >>>>>>>>>>> applications, such >>>>>>>>>>> as Web Apps. >>>>>>>>>>> >>>>>>>>>>> On Wed, Feb 24, 2021 at 10:01 AM Jan Lukavský <je...@seznam.cz> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Rex, >>>>>>>>>>>> >>>>>>>>>>>> If I understand correctly, you are concerned about behavior of >>>>>>>>>>>> Kafka source in the case of compacted topic, right? If that is the >>>>>>>>>>>> case, >>>>>>>>>>>> then this is not directly related to Flink, Flink will expose the >>>>>>>>>>>> behavior >>>>>>>>>>>> defined by Kafka. You can read about it for instance here [1]. >>>>>>>>>>>> TL;TD - your >>>>>>>>>>>> pipeline is guaranteed to see every record written to topic (every >>>>>>>>>>>> single >>>>>>>>>>>> update, be it later "overwritten" or not) if it processes the >>>>>>>>>>>> record with >>>>>>>>>>>> latency at most 'delete.retention.ms'. This is configurable >>>>>>>>>>>> per topic - default 24 hours. If you want to reprocess the data >>>>>>>>>>>> later, your >>>>>>>>>>>> consumer might see only resulting compacted ("retracted") stream, >>>>>>>>>>>> and not >>>>>>>>>>>> every record actually written to the topic. >>>>>>>>>>>> >>>>>>>>>>>> Jan >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262 >>>>>>>>>>>> On 2/24/21 3:14 AM, Rex Fenley wrote: >>>>>>>>>>>> >>>>>>>>>>>> Apologies, forgot to finish. If the Kafka source performs its >>>>>>>>>>>> own retractions of old data on key (user_id) for every append it >>>>>>>>>>>> receives, >>>>>>>>>>>> it should resolve this discrepancy I think. >>>>>>>>>>>> >>>>>>>>>>>> Again, is this true? Anything else I'm missing? >>>>>>>>>>>> >>>>>>>>>>>> Thanks! >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Feb 23, 2021 at 6:12 PM Rex Fenley <r...@remind101.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm concerned about the impacts of Kafka's compactions when >>>>>>>>>>>>> sending data between running flink jobs. >>>>>>>>>>>>> >>>>>>>>>>>>> For example, one job produces retract stream records in >>>>>>>>>>>>> sequence of >>>>>>>>>>>>> (false, (user_id: 1, state: "california") -- retract >>>>>>>>>>>>> (true, (user_id: 1, state: "ohio")) -- append >>>>>>>>>>>>> Which is consumed by Kafka and keyed by user_id, this could >>>>>>>>>>>>> end up compacting to just >>>>>>>>>>>>> (true, (user_id: 1, state: "ohio")) -- append >>>>>>>>>>>>> If some other downstream Flink job has a filter on state == >>>>>>>>>>>>> "california" and reads from the Kafka stream, I assume it will >>>>>>>>>>>>> miss the >>>>>>>>>>>>> retract message altogether and produce incorrect results. >>>>>>>>>>>>> >>>>>>>>>>>>> Is this true? How do we prevent this from happening? We need >>>>>>>>>>>>> to use compaction since all our jobs are based on CDC and we >>>>>>>>>>>>> can't just >>>>>>>>>>>>> drop data after x number of days. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> >>>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> >>>>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> >>>>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>>> >>>>>>>> >>>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>>> <https://www.facebook.com/remindhq> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> >>>>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>>>> >>>>>>> >>>>>>> Remind.com <https://www.remind.com/> | BLOG >>>>>>> <http://blog.remind.com/> | FOLLOW US >>>>>>> <https://twitter.com/remindhq> | LIKE US >>>>>>> <https://www.facebook.com/remindhq> >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> >>>>> Rex Fenley | Software Engineer - Mobile and Backend >>>>> >>>>> >>>>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>>>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>>>> <https://www.facebook.com/remindhq> >>>>> >>>> >>> >>> -- >>> >>> Rex Fenley | Software Engineer - Mobile and Backend >>> >>> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >>> <https://www.facebook.com/remindhq> >>> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> >