Thanks @Sivabalan ~ I agree that parquet and log files should keep sync in metadata columns in case there are confusions and special handling in some use cases like compaction.
I also agree add a metadata column is more ease to use for SQL connectors. We can add a metadata column named "_hoodie_change_flag" and a config option to default disable this metadata column, what do you think? Best, Danny Chan Sivabalan <[email protected]> 于2021年4月17日周六 上午9:10写道: > wrt changes if we plan to add this only to log files, compaction needs to > be fixed to omit this column to the minimum. > > On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <[email protected]> wrote: > > > Just got a chance to read about dynamic tables. sounds interesting. > > > > some thoughts on your questions: > > - yes, just MOR makes sense. > > - But adding this new meta column only to avro logs might incur some non > > trivial changes. Since as of today, schema of avro and base files are in > > sync. If this new col is going to store just 2 > bits(insert/update/delete), > > we might as well add it to base files as well and keep it simple. But we > > can make it configurable so that only those interested can enable this > new > > column to hudi dataset. > > - Wondering, even today we can achieve this by using a transformer, to > set > > right values to this new column. I mean, users need to add this col in > > their schema when defining the hudi dataset and if the incoming data has > > right values for this col (using deltastreamer's transformer or by > explicit > > means), we don't even need to add this as a meta column. Just saying that > > we can achieve this even today. But if we are looking to integrate w/ SQL > > DML, then adding this support would be elegant. > > > > > > > > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <[email protected]> > wrote: > > > >> Thanks Vinoth ~ > >> > >> Here is a document about the notion of 《Flink Dynamic Table》[1] , every > >> operator that has accumulate state can handle retractions(UPDATE_BEFORE > or > >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each > >> operator can consume the CDC format messages in streaming way. > >> > >> > Another aspect to think about is, how the new flag can be added to > >> existing > >> tables and if the schema evolution would be fine. > >> > >> That is also my concern, but it's not that bad because adding a new > column > >> is still compatible for old schema in Avro. > >> > >> [1] > >> > >> > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html > >> > >> Best, > >> Danny Chan > >> > >> Vinoth Chandar <[email protected]> 于2021年4月16日周五 上午9:44写道: > >> > >> > Hi, > >> > > >> > Is the intent of the flag to convey if an insert delete or update > >> changed > >> > the record? If so I would imagine that we do this even for cow tables, > >> > since that also supports a logical notion of a change stream using the > >> > commit_time meta field. > >> > > >> > You may be right, but I am trying to understand the use case for this. > >> Any > >> > links/flink docs I can read? > >> > > >> > Another aspect to think about is, how the new flag can be added to > >> existing > >> > tables and if the schema evolution would be fine. > >> > > >> > Thanks > >> > Vinoth > >> > > >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <[email protected]> > wrote: > >> > > >> > > I tries to do a POC for flink locally and it works well, in the PR i > >> add > >> > a > >> > > new metadata column named "_hoodie_change_flag", but actually i > found > >> > that > >> > > only log format needs this flag, and the Spark may has no ability to > >> > handle > >> > > the flag for incremental processing yet. > >> > > > >> > > So should i add the "_hoodie_change_flag" metadata column, or is > there > >> > any > >> > > better solution for this? > >> > > > >> > > Best, > >> > > Danny Chan > >> > > > >> > > Danny Chan <[email protected]> 于2021年4月2日周五 上午11:08写道: > >> > > > >> > > > Thanks cool, then the left questions are: > >> > > > > >> > > > - where we record these change, should we add a builtin meta field > >> such > >> > > as > >> > > > the _change_flag_ like the other system columns for e.g > >> > > _hoodie_commit_time > >> > > > - what kind of table should keep these flags, in my thoughts, we > >> should > >> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO > >> logs > >> > > > - we should add a config there to switch on/off the flags in > system > >> > meta > >> > > > fields > >> > > > > >> > > > What do you think? > >> > > > > >> > > > Best, > >> > > > Danny Chan > >> > > > > >> > > > vino yang <[email protected]> 于2021年4月1日周四 上午10:58写道: > >> > > > > >> > > >> >> Oops, the image crushes, for "change flags", i mean: insert, > >> > > >> update(before > >> > > >> and after) and delete. > >> > > >> > >> > > >> Yes, the image I attached is also about these flags. > >> > > >> [image: image (3).png] > >> > > >> > >> > > >> +1 for the idea. > >> > > >> > >> > > >> Best, > >> > > >> Vino > >> > > >> > >> > > >> > >> > > >> Danny Chan <[email protected]> 于2021年4月1日周四 上午10:03写道: > >> > > >> > >> > > >>> Oops, the image crushes, for "change flags", i mean: insert, > >> > > >>> update(before > >> > > >>> and after) and delete. > >> > > >>> > >> > > >>> The Flink engine can propagate the change flags internally > between > >> > its > >> > > >>> operators, if HUDI can send the change flags to Flink, the > >> > incremental > >> > > >>> calculation of CDC would be very natural (almost transparent to > >> > users). > >> > > >>> > >> > > >>> Best, > >> > > >>> Danny Chan > >> > > >>> > >> > > >>> vino yang <[email protected]> 于2021年3月31日周三 下午11:32写道: > >> > > >>> > >> > > >>> > Hi Danny, > >> > > >>> > > >> > > >>> > Thanks for kicking off this discussion thread. > >> > > >>> > > >> > > >>> > Yes, incremental query( or says "incremental processing") has > >> > always > >> > > >>> been > >> > > >>> > an important feature of the Hudi framework. If we can make > this > >> > > feature > >> > > >>> > better, it will be even more exciting. > >> > > >>> > > >> > > >>> > In the data warehouse, in some complex calculations, I have > not > >> > > found a > >> > > >>> > good way to conveniently use some incremental change data > >> (similar > >> > to > >> > > >>> the > >> > > >>> > concept of retracement stream in Flink?) to locally "correct" > >> the > >> > > >>> > aggregation result (these aggregation results may belong to > the > >> DWS > >> > > >>> layer). > >> > > >>> > > >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios > >> (single > >> > > >>> table > >> > > >>> > or an algorithm that can be very easily retracement) can be > >> dealt > >> > > with > >> > > >>> > based on the incremental calculation of CDC. > >> > > >>> > > >> > > >>> > Of course, the expression of incremental calculation on > various > >> > > >>> occasions > >> > > >>> > is sometimes not very clear. Maybe we will discuss it more > >> clearly > >> > in > >> > > >>> > specific scenarios. > >> > > >>> > > >> > > >>> > >> If HUDI can keep and propagate these change flags to its > >> > > consumers, > >> > > >>> we > >> > > >>> > can > >> > > >>> > use HUDI as the unified format for the pipeline. > >> > > >>> > > >> > > >>> > Regarding the "change flags" here, do you mean the flags like > >> the > >> > one > >> > > >>> > shown in the figure below? > >> > > >>> > > >> > > >>> > [image: image.png] > >> > > >>> > > >> > > >>> > Best, > >> > > >>> > Vino > >> > > >>> > > >> > > >>> > Danny Chan <[email protected]> 于2021年3月31日周三 下午6:24写道: > >> > > >>> > > >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about > >> using > >> > > >>> HUDI as > >> > > >>> >> the unified storage/format for data warehouse/lake > incremental > >> > > >>> >> computation. > >> > > >>> >> > >> > > >>> >> Usually people divide data warehouse production into several > >> > levels, > >> > > >>> such > >> > > >>> >> as the ODS(operation data store), DWD(data warehouse > details), > >> > > >>> DWS(data > >> > > >>> >> warehouse service), ADS(application data service). > >> > > >>> >> > >> > > >>> >> > >> > > >>> >> ODS -> DWD -> DWS -> ADS > >> > > >>> >> > >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases, a > >> big > >> > > >>> topic is > >> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS > >> into > >> > the > >> > > >>> >> warehouse/lake, the cdc patten records and propagate the > change > >> > > flag: > >> > > >>> >> insert, update(before and after) and delete for the consumer, > >> with > >> > > >>> these > >> > > >>> >> flags, the downstream engines can have a realtime > accumulation > >> > > >>> >> computation. > >> > > >>> >> > >> > > >>> >> Using streaming engine like Flink, we can have a totally > >> > > >>> NEAR-REAL-TIME > >> > > >>> >> computation pipeline for each of the layer. > >> > > >>> >> > >> > > >>> >> If HUDI can keep and propagate these change flags to its > >> > consumers, > >> > > >>> we can > >> > > >>> >> use HUDI as the unified format for the pipeline. > >> > > >>> >> > >> > > >>> >> I'm expecting your nice ideas here ~ > >> > > >>> >> > >> > > >>> >> Best, > >> > > >>> >> Danny Chan > >> > > >>> >> > >> > > >>> > > >> > > >>> > >> > > >> > >> > > > >> > > >> > > > > > > -- > > Regards, > > -Sivabalan > > > > > -- > Regards, > -Sivabalan >
