Hi, i have created a PR here: https://github.com/apache/hudi/pull/2854/files
In the PR i do these changes: 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config option because i can not find a good way to make the code clean, a metadata column is very primitive and a config option would introduce too many changes 2. Modify the write handle to add the column: add operation for append handle but merges the changes for create handle and merge handle 3. the flag is only useful for streaming read, so i also merge the flags for Flink batch reader, Flink streaming reader would emit each record with the right cdc operation I did not change any Spark code because i'm not familiar with that, Spark actually can not handle these flags in operators, So by default, the column "_hoodie_cdc_operation", it has a value from the Flink writer. There may also need some unit tests for the new column in the hoodie core, but i don't know how to, could you give some help ? Best, Danny Chan Danny Chan <danny0...@apache.org> 于2021年4月19日周一 下午4:42写道: > Thanks @Sivabalan ~ > > I agree that parquet and log files should keep sync in metadata columns in > case there are confusions > and special handling in some use cases like compaction. > > I also agree add a metadata column is more ease to use for SQL connectors. > > We can add a metadata column named "_hoodie_change_flag" and a config > option to default disable this metadata column, what do you think? > > Best, > Danny Chan > > > > Sivabalan <n.siv...@gmail.com> 于2021年4月17日周六 上午9:10写道: > >> wrt changes if we plan to add this only to log files, compaction needs to >> be fixed to omit this column to the minimum. >> >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n.siv...@gmail.com> wrote: >> >> > Just got a chance to read about dynamic tables. sounds interesting. >> > >> > some thoughts on your questions: >> > - yes, just MOR makes sense. >> > - But adding this new meta column only to avro logs might incur some non >> > trivial changes. Since as of today, schema of avro and base files are in >> > sync. If this new col is going to store just 2 >> bits(insert/update/delete), >> > we might as well add it to base files as well and keep it simple. But we >> > can make it configurable so that only those interested can enable this >> new >> > column to hudi dataset. >> > - Wondering, even today we can achieve this by using a transformer, to >> set >> > right values to this new column. I mean, users need to add this col in >> > their schema when defining the hudi dataset and if the incoming data has >> > right values for this col (using deltastreamer's transformer or by >> explicit >> > means), we don't even need to add this as a meta column. Just saying >> that >> > we can achieve this even today. But if we are looking to integrate w/ >> SQL >> > DML, then adding this support would be elegant. >> > >> > >> > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <danny0...@apache.org> >> wrote: >> > >> >> Thanks Vinoth ~ >> >> >> >> Here is a document about the notion of 《Flink Dynamic Table》[1] , every >> >> operator that has accumulate state can handle >> retractions(UPDATE_BEFORE or >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each >> >> operator can consume the CDC format messages in streaming way. >> >> >> >> > Another aspect to think about is, how the new flag can be added to >> >> existing >> >> tables and if the schema evolution would be fine. >> >> >> >> That is also my concern, but it's not that bad because adding a new >> column >> >> is still compatible for old schema in Avro. >> >> >> >> [1] >> >> >> >> >> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html >> >> >> >> Best, >> >> Danny Chan >> >> >> >> Vinoth Chandar <vin...@apache.org> 于2021年4月16日周五 上午9:44写道: >> >> >> >> > Hi, >> >> > >> >> > Is the intent of the flag to convey if an insert delete or update >> >> changed >> >> > the record? If so I would imagine that we do this even for cow >> tables, >> >> > since that also supports a logical notion of a change stream using >> the >> >> > commit_time meta field. >> >> > >> >> > You may be right, but I am trying to understand the use case for >> this. >> >> Any >> >> > links/flink docs I can read? >> >> > >> >> > Another aspect to think about is, how the new flag can be added to >> >> existing >> >> > tables and if the schema evolution would be fine. >> >> > >> >> > Thanks >> >> > Vinoth >> >> > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <danny0...@apache.org> >> wrote: >> >> > >> >> > > I tries to do a POC for flink locally and it works well, in the PR >> i >> >> add >> >> > a >> >> > > new metadata column named "_hoodie_change_flag", but actually i >> found >> >> > that >> >> > > only log format needs this flag, and the Spark may has no ability >> to >> >> > handle >> >> > > the flag for incremental processing yet. >> >> > > >> >> > > So should i add the "_hoodie_change_flag" metadata column, or is >> there >> >> > any >> >> > > better solution for this? >> >> > > >> >> > > Best, >> >> > > Danny Chan >> >> > > >> >> > > Danny Chan <danny0...@apache.org> 于2021年4月2日周五 上午11:08写道: >> >> > > >> >> > > > Thanks cool, then the left questions are: >> >> > > > >> >> > > > - where we record these change, should we add a builtin meta >> field >> >> such >> >> > > as >> >> > > > the _change_flag_ like the other system columns for e.g >> >> > > _hoodie_commit_time >> >> > > > - what kind of table should keep these flags, in my thoughts, we >> >> should >> >> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO >> >> logs >> >> > > > - we should add a config there to switch on/off the flags in >> system >> >> > meta >> >> > > > fields >> >> > > > >> >> > > > What do you think? >> >> > > > >> >> > > > Best, >> >> > > > Danny Chan >> >> > > > >> >> > > > vino yang <yanghua1...@gmail.com> 于2021年4月1日周四 上午10:58写道: >> >> > > > >> >> > > >> >> Oops, the image crushes, for "change flags", i mean: insert, >> >> > > >> update(before >> >> > > >> and after) and delete. >> >> > > >> >> >> > > >> Yes, the image I attached is also about these flags. >> >> > > >> [image: image (3).png] >> >> > > >> >> >> > > >> +1 for the idea. >> >> > > >> >> >> > > >> Best, >> >> > > >> Vino >> >> > > >> >> >> > > >> >> >> > > >> Danny Chan <danny0...@apache.org> 于2021年4月1日周四 上午10:03写道: >> >> > > >> >> >> > > >>> Oops, the image crushes, for "change flags", i mean: insert, >> >> > > >>> update(before >> >> > > >>> and after) and delete. >> >> > > >>> >> >> > > >>> The Flink engine can propagate the change flags internally >> between >> >> > its >> >> > > >>> operators, if HUDI can send the change flags to Flink, the >> >> > incremental >> >> > > >>> calculation of CDC would be very natural (almost transparent to >> >> > users). >> >> > > >>> >> >> > > >>> Best, >> >> > > >>> Danny Chan >> >> > > >>> >> >> > > >>> vino yang <yanghua1...@gmail.com> 于2021年3月31日周三 下午11:32写道: >> >> > > >>> >> >> > > >>> > Hi Danny, >> >> > > >>> > >> >> > > >>> > Thanks for kicking off this discussion thread. >> >> > > >>> > >> >> > > >>> > Yes, incremental query( or says "incremental processing") has >> >> > always >> >> > > >>> been >> >> > > >>> > an important feature of the Hudi framework. If we can make >> this >> >> > > feature >> >> > > >>> > better, it will be even more exciting. >> >> > > >>> > >> >> > > >>> > In the data warehouse, in some complex calculations, I have >> not >> >> > > found a >> >> > > >>> > good way to conveniently use some incremental change data >> >> (similar >> >> > to >> >> > > >>> the >> >> > > >>> > concept of retracement stream in Flink?) to locally "correct" >> >> the >> >> > > >>> > aggregation result (these aggregation results may belong to >> the >> >> DWS >> >> > > >>> layer). >> >> > > >>> > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios >> >> (single >> >> > > >>> table >> >> > > >>> > or an algorithm that can be very easily retracement) can be >> >> dealt >> >> > > with >> >> > > >>> > based on the incremental calculation of CDC. >> >> > > >>> > >> >> > > >>> > Of course, the expression of incremental calculation on >> various >> >> > > >>> occasions >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it more >> >> clearly >> >> > in >> >> > > >>> > specific scenarios. >> >> > > >>> > >> >> > > >>> > >> If HUDI can keep and propagate these change flags to its >> >> > > consumers, >> >> > > >>> we >> >> > > >>> > can >> >> > > >>> > use HUDI as the unified format for the pipeline. >> >> > > >>> > >> >> > > >>> > Regarding the "change flags" here, do you mean the flags like >> >> the >> >> > one >> >> > > >>> > shown in the figure below? >> >> > > >>> > >> >> > > >>> > [image: image.png] >> >> > > >>> > >> >> > > >>> > Best, >> >> > > >>> > Vino >> >> > > >>> > >> >> > > >>> > Danny Chan <danny0...@apache.org> 于2021年3月31日周三 下午6:24写道: >> >> > > >>> > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about >> >> using >> >> > > >>> HUDI as >> >> > > >>> >> the unified storage/format for data warehouse/lake >> incremental >> >> > > >>> >> computation. >> >> > > >>> >> >> >> > > >>> >> Usually people divide data warehouse production into several >> >> > levels, >> >> > > >>> such >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse >> details), >> >> > > >>> DWS(data >> >> > > >>> >> warehouse service), ADS(application data service). >> >> > > >>> >> >> >> > > >>> >> >> >> > > >>> >> ODS -> DWD -> DWS -> ADS >> >> > > >>> >> >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, >> a >> >> big >> >> > > >>> topic is >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS >> >> into >> >> > the >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the >> change >> >> > > flag: >> >> > > >>> >> insert, update(before and after) and delete for the >> consumer, >> >> with >> >> > > >>> these >> >> > > >>> >> flags, the downstream engines can have a realtime >> accumulation >> >> > > >>> >> computation. >> >> > > >>> >> >> >> > > >>> >> Using streaming engine like Flink, we can have a totally >> >> > > >>> NEAR-REAL-TIME >> >> > > >>> >> computation pipeline for each of the layer. >> >> > > >>> >> >> >> > > >>> >> If HUDI can keep and propagate these change flags to its >> >> > consumers, >> >> > > >>> we can >> >> > > >>> >> use HUDI as the unified format for the pipeline. >> >> > > >>> >> >> >> > > >>> >> I'm expecting your nice ideas here ~ >> >> > > >>> >> >> >> > > >>> >> Best, >> >> > > >>> >> Danny Chan >> >> > > >>> >> >> >> > > >>> > >> >> > > >>> >> >> > > >> >> >> > > >> >> > >> >> >> > >> > >> > -- >> > Regards, >> > -Sivabalan >> > >> >> >> -- >> Regards, >> -Sivabalan >> >