Hi Danny, Thanks, I will review this asap. Already, in the "review in progress" column.
Thanks Vinoth On Thu, Apr 22, 2021 at 12:49 AM Danny Chan <danny0...@apache.org> wrote: > > Should we throw together a PoC/test code for an example Flink pipeline > that > will use hudi cdc flags + state ful operators? > > I have updated the pr https://github.com/apache/hudi/pull/2854, > > see the test case HoodieDataSourceITCase#testStreamReadWithDeletes. > > A data source: > > change_flag | uuid | name | age | ts | partition > > I, id1, Danny, 23, 1970-01-01T00:00:01, par1 > I, id2, Stephen, 33, 1970-01-01T00:00:02, par1 > I, id3, Julian, 53, 1970-01-01T00:00:03, par2 > I, id4, Fabian, 31, 1970-01-01T00:00:04, par2 > I, id5, Sophia, 18, 1970-01-01T00:00:05, par3 > I, id6, Emma, 20, 1970-01-01T00:00:06, par3 > I, id7, Bob, 44, 1970-01-01T00:00:07, par4 > I, id8, Han, 56, 1970-01-01T00:00:08, par4 > U, id1, Danny, 24, 1970-01-01T00:00:01, par1 > U, id2, Stephen, 34, 1970-01-01T00:00:02, par1 > I, id3, Julian, 53, 1970-01-01T00:00:03, par2 > D, id5, Sophia, 18, 1970-01-01T00:00:05, par3 > D, id9, Jane, 19, 1970-01-01T00:00:06, par3 > > with streaming query "select name, sum(age) from t1 group by name" returns: > > change_flag | name | age_sum > I, Danny, 24 > I Stephen, 34 > > The result is the same as a batch snapshot query. > > Best, > Danny Chan > > Vinoth Chandar <vin...@apache.org> 于2021年4月21日周三 下午1:32写道: > > > Keeping compatibility is a must. i.e users should be able to upgrade to > the > > new release with the _hoodie_cdc_flag meta column, > > and be able to query new data (with this new meta col) alongside old data > > (without this new meta col). > > In fact, they should be able to downgrade back to previous versions (say > > there is some other snag they hit), and go back to not writing this new > > meta column. > > if this is too hard, then a config to control is not a bad idea at-least > > for an initial release? > > > > Thanks for clarifying the use-case! Makes total sense to me and look > > forward to getting this going. > > Should we throw together a PoC/test code for an example Flink pipeline > that > > will use hudi cdc flags + state ful operators? > > It ll help us iron out gaps iteratively, finalize requirements - instead > of > > a more top-down, waterfall like model? > > > > On Tue, Apr 20, 2021 at 8:25 PM Danny Chan <danny0...@apache.org> wrote: > > > > > > Is it providing the ability to author continuous queries on > > > Hudi source tables end-end, > > > given Flink can use the flags to generate retract/upsert streams > > > > > > Yes,that's the key point, with these flags plus flink stateful > operators, > > > we can have a real time incremental ETL pipeline. > > > > > > For example, a global aggregation that consumes cdc stream can do > > > acc/retract continuously and send the changes to downstream. > > > The ETL pipeline with cdc stream generates the same result as the batch > > > snapshot with the same sql query. > > > > > > If keeping compatibility is a must with/without the new metadata > > columns, I > > > think there is no need to add a config option which brings in > > > unnecessary overhead. If we do not ensure backward compatibility for > new > > > column, then we should add such a config option and by default > > > disable it. > > > > > > Best, > > > Danny Chan > > > > > > > > > Vinoth Chandar <vin...@apache.org> 于2021年4月21日周三 上午6:30写道: > > > > > > > Hi Danny, > > > > > > > > Read up on the Flink docs as well. > > > > > > > > If we don't actually publish data to the metacolumn, I think the > > overhead > > > > is pretty low w.r.t avro/parquet. Both are very good at encoding > nulls. > > > > But, I feel it's worth adding a HoodieWriteConfig to control this and > > > since > > > > addition of meta columns mostly happens in the handles, > > > > it may not be as bad ? Happy to suggest more concrete ideas on the > PR. > > > > > > > > We still need to test backwards compatibility from different engines > > > quite > > > > early though and make sure there are no surprises. > > > > Hive, Parquet, Spark, Presto all have their own rules on evolution as > > > well. > > > > So we need to think this through if/how seamlessly this can be turned > > on > > > > for existing tables > > > > > > > > As for testing the new column, given Flink is what will be able to > > > consume > > > > the flags? Can we write a quick unit test using Dynamic tables? > > > > I am also curious to understand how the flags help the end user > > > ultimately? > > > > Reading the flink docs, I understand the concepts (coming from a > Kafka > > > > streams world, > > > > most of it seem familiar), but what exact problem does the flag solve > > > that > > > > exist today? Is it providing the ability to author continuous queries > > on > > > > Hudi source tables end-end, > > > > given Flink can use the flags to generate retract/upsert streams? > > > > > > > > For hard deletes, we still need to do some core work to make it > > available > > > > in the incremental query. So there's more to be done here for > cracking > > > this > > > > end-end streaming/continuous ETL vision? > > > > > > > > Very exciting stuff! > > > > > > > > Thanks > > > > Vinoth > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <danny0...@apache.org> > > wrote: > > > > > > > > > Hi, i have created a PR here: > > > > > https://github.com/apache/hudi/pull/2854/files > > > > > > > > > > In the PR i do these changes: > > > > > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a > > > config > > > > > option because i can not find a good way to make the code clean, a > > > > metadata > > > > > column is very primitive and a config option would introduce too > many > > > > > changes > > > > > 2. Modify the write handle to add the column: add operation for > > append > > > > > handle but merges the changes for create handle and merge handle > > > > > 3. the flag is only useful for streaming read, so i also merge the > > > flags > > > > > for Flink batch reader, Flink streaming reader would emit each > record > > > > with > > > > > the right cdc operation > > > > > > > > > > I did not change any Spark code because i'm not familiar with that, > > > Spark > > > > > actually can not handle these flags in operators, So by default, > the > > > > > column "_hoodie_cdc_operation", it has a value from the Flink > writer. > > > > > > > > > > There may also need some unit tests for the new column in the > hoodie > > > > core, > > > > > but i don't know how to, could you give some help ? > > > > > > > > > > Best, > > > > > Danny Chan > > > > > > > > > > Danny Chan <danny0...@apache.org> 于2021年4月19日周一 下午4:42写道: > > > > > > > > > > > Thanks @Sivabalan ~ > > > > > > > > > > > > I agree that parquet and log files should keep sync in metadata > > > columns > > > > > in > > > > > > case there are confusions > > > > > > and special handling in some use cases like compaction. > > > > > > > > > > > > I also agree add a metadata column is more ease to use for SQL > > > > > connectors. > > > > > > > > > > > > We can add a metadata column named "_hoodie_change_flag" and a > > config > > > > > > option to default disable this metadata column, what do you > think? > > > > > > > > > > > > Best, > > > > > > Danny Chan > > > > > > > > > > > > > > > > > > > > > > > > Sivabalan <n.siv...@gmail.com> 于2021年4月17日周六 上午9:10写道: > > > > > > > > > > > >> wrt changes if we plan to add this only to log files, compaction > > > needs > > > > > to > > > > > >> be fixed to omit this column to the minimum. > > > > > >> > > > > > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n.siv...@gmail.com> > > > wrote: > > > > > >> > > > > > >> > Just got a chance to read about dynamic tables. sounds > > > interesting. > > > > > >> > > > > > > >> > some thoughts on your questions: > > > > > >> > - yes, just MOR makes sense. > > > > > >> > - But adding this new meta column only to avro logs might > incur > > > some > > > > > non > > > > > >> > trivial changes. Since as of today, schema of avro and base > > files > > > > are > > > > > in > > > > > >> > sync. If this new col is going to store just 2 > > > > > >> bits(insert/update/delete), > > > > > >> > we might as well add it to base files as well and keep it > > simple. > > > > But > > > > > we > > > > > >> > can make it configurable so that only those interested can > > enable > > > > this > > > > > >> new > > > > > >> > column to hudi dataset. > > > > > >> > - Wondering, even today we can achieve this by using a > > > transformer, > > > > to > > > > > >> set > > > > > >> > right values to this new column. I mean, users need to add > this > > > col > > > > in > > > > > >> > their schema when defining the hudi dataset and if the > incoming > > > data > > > > > has > > > > > >> > right values for this col (using deltastreamer's transformer > or > > by > > > > > >> explicit > > > > > >> > means), we don't even need to add this as a meta column. Just > > > saying > > > > > >> that > > > > > >> > we can achieve this even today. But if we are looking to > > integrate > > > > w/ > > > > > >> SQL > > > > > >> > DML, then adding this support would be elegant. > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan < > > danny0...@apache.org > > > > > > > > > >> wrote: > > > > > >> > > > > > > >> >> Thanks Vinoth ~ > > > > > >> >> > > > > > >> >> Here is a document about the notion of 《Flink Dynamic > > Table》[1] , > > > > > every > > > > > >> >> operator that has accumulate state can handle > > > > > >> retractions(UPDATE_BEFORE or > > > > > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so > > that > > > > each > > > > > >> >> operator can consume the CDC format messages in streaming > way. > > > > > >> >> > > > > > >> >> > Another aspect to think about is, how the new flag can be > > added > > > > to > > > > > >> >> existing > > > > > >> >> tables and if the schema evolution would be fine. > > > > > >> >> > > > > > >> >> That is also my concern, but it's not that bad because > adding a > > > new > > > > > >> column > > > > > >> >> is still compatible for old schema in Avro. > > > > > >> >> > > > > > >> >> [1] > > > > > >> >> > > > > > >> >> > > > > > >> > > > > > > > > > > > > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html > > > > > >> >> > > > > > >> >> Best, > > > > > >> >> Danny Chan > > > > > >> >> > > > > > >> >> Vinoth Chandar <vin...@apache.org> 于2021年4月16日周五 上午9:44写道: > > > > > >> >> > > > > > >> >> > Hi, > > > > > >> >> > > > > > > >> >> > Is the intent of the flag to convey if an insert delete or > > > update > > > > > >> >> changed > > > > > >> >> > the record? If so I would imagine that we do this even for > > cow > > > > > >> tables, > > > > > >> >> > since that also supports a logical notion of a change > stream > > > > using > > > > > >> the > > > > > >> >> > commit_time meta field. > > > > > >> >> > > > > > > >> >> > You may be right, but I am trying to understand the use > case > > > for > > > > > >> this. > > > > > >> >> Any > > > > > >> >> > links/flink docs I can read? > > > > > >> >> > > > > > > >> >> > Another aspect to think about is, how the new flag can be > > added > > > > to > > > > > >> >> existing > > > > > >> >> > tables and if the schema evolution would be fine. > > > > > >> >> > > > > > > >> >> > Thanks > > > > > >> >> > Vinoth > > > > > >> >> > > > > > > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan < > > > danny0...@apache.org> > > > > > >> wrote: > > > > > >> >> > > > > > > >> >> > > I tries to do a POC for flink locally and it works well, > in > > > the > > > > > PR > > > > > >> i > > > > > >> >> add > > > > > >> >> > a > > > > > >> >> > > new metadata column named "_hoodie_change_flag", but > > > actually i > > > > > >> found > > > > > >> >> > that > > > > > >> >> > > only log format needs this flag, and the Spark may has no > > > > ability > > > > > >> to > > > > > >> >> > handle > > > > > >> >> > > the flag for incremental processing yet. > > > > > >> >> > > > > > > > >> >> > > So should i add the "_hoodie_change_flag" metadata > column, > > or > > > > is > > > > > >> there > > > > > >> >> > any > > > > > >> >> > > better solution for this? > > > > > >> >> > > > > > > > >> >> > > Best, > > > > > >> >> > > Danny Chan > > > > > >> >> > > > > > > > >> >> > > Danny Chan <danny0...@apache.org> 于2021年4月2日周五 > 上午11:08写道: > > > > > >> >> > > > > > > > >> >> > > > Thanks cool, then the left questions are: > > > > > >> >> > > > > > > > > >> >> > > > - where we record these change, should we add a builtin > > > meta > > > > > >> field > > > > > >> >> such > > > > > >> >> > > as > > > > > >> >> > > > the _change_flag_ like the other system columns for e.g > > > > > >> >> > > _hoodie_commit_time > > > > > >> >> > > > - what kind of table should keep these flags, in my > > > thoughts, > > > > > we > > > > > >> >> should > > > > > >> >> > > > only add these flags for "MERGE_ON_READ" table, and > only > > > for > > > > > AVRO > > > > > >> >> logs > > > > > >> >> > > > - we should add a config there to switch on/off the > flags > > > in > > > > > >> system > > > > > >> >> > meta > > > > > >> >> > > > fields > > > > > >> >> > > > > > > > > >> >> > > > What do you think? > > > > > >> >> > > > > > > > > >> >> > > > Best, > > > > > >> >> > > > Danny Chan > > > > > >> >> > > > > > > > > >> >> > > > vino yang <yanghua1...@gmail.com> 于2021年4月1日周四 > > 上午10:58写道: > > > > > >> >> > > > > > > > > >> >> > > >> >> Oops, the image crushes, for "change flags", i > mean: > > > > > insert, > > > > > >> >> > > >> update(before > > > > > >> >> > > >> and after) and delete. > > > > > >> >> > > >> > > > > > >> >> > > >> Yes, the image I attached is also about these flags. > > > > > >> >> > > >> [image: image (3).png] > > > > > >> >> > > >> > > > > > >> >> > > >> +1 for the idea. > > > > > >> >> > > >> > > > > > >> >> > > >> Best, > > > > > >> >> > > >> Vino > > > > > >> >> > > >> > > > > > >> >> > > >> > > > > > >> >> > > >> Danny Chan <danny0...@apache.org> 于2021年4月1日周四 > > 上午10:03写道: > > > > > >> >> > > >> > > > > > >> >> > > >>> Oops, the image crushes, for "change flags", i mean: > > > > insert, > > > > > >> >> > > >>> update(before > > > > > >> >> > > >>> and after) and delete. > > > > > >> >> > > >>> > > > > > >> >> > > >>> The Flink engine can propagate the change flags > > > internally > > > > > >> between > > > > > >> >> > its > > > > > >> >> > > >>> operators, if HUDI can send the change flags to > Flink, > > > the > > > > > >> >> > incremental > > > > > >> >> > > >>> calculation of CDC would be very natural (almost > > > > transparent > > > > > to > > > > > >> >> > users). > > > > > >> >> > > >>> > > > > > >> >> > > >>> Best, > > > > > >> >> > > >>> Danny Chan > > > > > >> >> > > >>> > > > > > >> >> > > >>> vino yang <yanghua1...@gmail.com> 于2021年3月31日周三 > > > 下午11:32写道: > > > > > >> >> > > >>> > > > > > >> >> > > >>> > Hi Danny, > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > Thanks for kicking off this discussion thread. > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > Yes, incremental query( or says "incremental > > > processing") > > > > > has > > > > > >> >> > always > > > > > >> >> > > >>> been > > > > > >> >> > > >>> > an important feature of the Hudi framework. If we > can > > > > make > > > > > >> this > > > > > >> >> > > feature > > > > > >> >> > > >>> > better, it will be even more exciting. > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > In the data warehouse, in some complex > calculations, > > I > > > > have > > > > > >> not > > > > > >> >> > > found a > > > > > >> >> > > >>> > good way to conveniently use some incremental > change > > > data > > > > > >> >> (similar > > > > > >> >> > to > > > > > >> >> > > >>> the > > > > > >> >> > > >>> > concept of retracement stream in Flink?) to locally > > > > > "correct" > > > > > >> >> the > > > > > >> >> > > >>> > aggregation result (these aggregation results may > > > belong > > > > to > > > > > >> the > > > > > >> >> DWS > > > > > >> >> > > >>> layer). > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation > > > > scenarios > > > > > >> >> (single > > > > > >> >> > > >>> table > > > > > >> >> > > >>> > or an algorithm that can be very easily > retracement) > > > can > > > > be > > > > > >> >> dealt > > > > > >> >> > > with > > > > > >> >> > > >>> > based on the incremental calculation of CDC. > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > Of course, the expression of incremental > calculation > > on > > > > > >> various > > > > > >> >> > > >>> occasions > > > > > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss > it > > > > more > > > > > >> >> clearly > > > > > >> >> > in > > > > > >> >> > > >>> > specific scenarios. > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > >> If HUDI can keep and propagate these change > flags > > to > > > > its > > > > > >> >> > > consumers, > > > > > >> >> > > >>> we > > > > > >> >> > > >>> > can > > > > > >> >> > > >>> > use HUDI as the unified format for the pipeline. > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > Regarding the "change flags" here, do you mean the > > > flags > > > > > like > > > > > >> >> the > > > > > >> >> > one > > > > > >> >> > > >>> > shown in the figure below? > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > [image: image.png] > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > Best, > > > > > >> >> > > >>> > Vino > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > Danny Chan <danny0...@apache.org> 于2021年3月31日周三 > > > > 下午6:24写道: > > > > > >> >> > > >>> > > > > > > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a > > discuss > > > > > about > > > > > >> >> using > > > > > >> >> > > >>> HUDI as > > > > > >> >> > > >>> >> the unified storage/format for data warehouse/lake > > > > > >> incremental > > > > > >> >> > > >>> >> computation. > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> Usually people divide data warehouse production > into > > > > > several > > > > > >> >> > levels, > > > > > >> >> > > >>> such > > > > > >> >> > > >>> >> as the ODS(operation data store), DWD(data > warehouse > > > > > >> details), > > > > > >> >> > > >>> DWS(data > > > > > >> >> > > >>> >> warehouse service), ADS(application data service). > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) > computation > > > > > cases, > > > > > >> a > > > > > >> >> big > > > > > >> >> > > >>> topic is > > > > > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds > > of > > > > > RDBMS > > > > > >> >> into > > > > > >> >> > the > > > > > >> >> > > >>> >> warehouse/lake, the cdc patten records and > propagate > > > the > > > > > >> change > > > > > >> >> > > flag: > > > > > >> >> > > >>> >> insert, update(before and after) and delete for > the > > > > > >> consumer, > > > > > >> >> with > > > > > >> >> > > >>> these > > > > > >> >> > > >>> >> flags, the downstream engines can have a realtime > > > > > >> accumulation > > > > > >> >> > > >>> >> computation. > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> Using streaming engine like Flink, we can have a > > > totally > > > > > >> >> > > >>> NEAR-REAL-TIME > > > > > >> >> > > >>> >> computation pipeline for each of the layer. > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> If HUDI can keep and propagate these change flags > to > > > its > > > > > >> >> > consumers, > > > > > >> >> > > >>> we can > > > > > >> >> > > >>> >> use HUDI as the unified format for the pipeline. > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> I'm expecting your nice ideas here ~ > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> >> Best, > > > > > >> >> > > >>> >> Danny Chan > > > > > >> >> > > >>> >> > > > > > >> >> > > >>> > > > > > > >> >> > > >>> > > > > > >> >> > > >> > > > > > >> >> > > > > > > > >> >> > > > > > > >> >> > > > > > >> > > > > > > >> > > > > > > >> > -- > > > > > >> > Regards, > > > > > >> > -Sivabalan > > > > > >> > > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Regards, > > > > > >> -Sivabalan > > > > > >> > > > > > > > > > > > > > > > > > > > > >