Hi, i have created a PR here: https://github.com/apache/hudi/pull/2854/files

In the PR i do these changes:
1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config
option because i can not find a good way to make the code clean, a metadata
column is very primitive and a config option would introduce too many
changes
2. Modify the write handle to add the column: add operation for append
handle but merges the changes for create handle and merge handle
3. the flag is only useful for streaming read, so i also merge the flags
for Flink batch reader, Flink streaming reader would emit each record with
the right cdc operation

I did not change any Spark code because i'm not familiar with that, Spark
actually can not handle these flags in operators, So by default, the
column "_hoodie_cdc_operation", it has a value from the Flink writer.

There may also need some unit tests for the new column in the hoodie core,
but i don't know how to, could you give some help ?

Best,
Danny Chan

Danny Chan <danny0...@apache.org> 于2021年4月19日周一 下午4:42写道:

> Thanks @Sivabalan ~
>
> I agree that parquet and log files should keep sync in metadata columns in
> case there are confusions
> and special handling in some use cases like compaction.
>
> I also agree add a metadata column is more ease to use for SQL connectors.
>
> We can add a metadata column named "_hoodie_change_flag" and a config
> option to default disable this metadata column, what do you think?
>
> Best,
> Danny Chan
>
>
>
> Sivabalan <n.siv...@gmail.com> 于2021年4月17日周六 上午9:10写道:
>
>> wrt changes if we plan to add this only to log files, compaction needs to
>> be fixed to omit this column to the minimum.
>>
>> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n.siv...@gmail.com> wrote:
>>
>> > Just got a chance to read about dynamic tables. sounds interesting.
>> >
>> > some thoughts on your questions:
>> > - yes, just MOR makes sense.
>> > - But adding this new meta column only to avro logs might incur some non
>> > trivial changes. Since as of today, schema of avro and base files are in
>> > sync. If this new col is going to store just 2
>> bits(insert/update/delete),
>> > we might as well add it to base files as well and keep it simple. But we
>> > can make it configurable so that only those interested can enable this
>> new
>> > column to hudi dataset.
>> > - Wondering, even today we can achieve this by using a transformer, to
>> set
>> > right values to this new column. I mean, users need to add this col in
>> > their schema when defining the hudi dataset and if the incoming data has
>> > right values for this col (using deltastreamer's transformer or by
>> explicit
>> > means), we don't even need to add this as a meta column. Just saying
>> that
>> > we can achieve this even today. But if we are looking to integrate w/
>> SQL
>> > DML, then adding this support would be elegant.
>> >
>> >
>> >
>> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <danny0...@apache.org>
>> wrote:
>> >
>> >> Thanks Vinoth ~
>> >>
>> >> Here is a document about the notion of 《Flink Dynamic Table》[1] , every
>> >> operator that has accumulate state can handle
>> retractions(UPDATE_BEFORE or
>> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
>> >> operator can consume the CDC format messages in streaming way.
>> >>
>> >> > Another aspect to think about is, how the new flag can be added to
>> >> existing
>> >> tables and if the schema evolution would be fine.
>> >>
>> >> That is also my concern, but it's not that bad because adding a new
>> column
>> >> is still compatible for old schema in Avro.
>> >>
>> >> [1]
>> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
>> >>
>> >> Best,
>> >> Danny Chan
>> >>
>> >> Vinoth Chandar <vin...@apache.org> 于2021年4月16日周五 上午9:44写道:
>> >>
>> >> > Hi,
>> >> >
>> >> > Is the intent of the flag to convey if an insert delete or update
>> >> changed
>> >> > the record? If so I would imagine that we do this even for cow
>> tables,
>> >> > since that also supports a logical notion of a change stream using
>> the
>> >> > commit_time meta field.
>> >> >
>> >> > You may be right, but I am trying to understand the use case for
>> this.
>> >> Any
>> >> > links/flink docs I can read?
>> >> >
>> >> > Another aspect to think about is, how the new flag can be added to
>> >> existing
>> >> > tables and if the schema evolution would be fine.
>> >> >
>> >> > Thanks
>> >> > Vinoth
>> >> >
>> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <danny0...@apache.org>
>> wrote:
>> >> >
>> >> > > I tries to do a POC for flink locally and it works well, in the PR
>> i
>> >> add
>> >> > a
>> >> > > new metadata column named "_hoodie_change_flag", but actually i
>> found
>> >> > that
>> >> > > only log format needs this flag, and the Spark may has no ability
>> to
>> >> > handle
>> >> > > the flag for incremental processing yet.
>> >> > >
>> >> > > So should i add the "_hoodie_change_flag" metadata column, or is
>> there
>> >> > any
>> >> > > better solution for this?
>> >> > >
>> >> > > Best,
>> >> > > Danny Chan
>> >> > >
>> >> > > Danny Chan <danny0...@apache.org> 于2021年4月2日周五 上午11:08写道:
>> >> > >
>> >> > > > Thanks cool, then the left questions are:
>> >> > > >
>> >> > > > - where we record these change, should we add a builtin meta
>> field
>> >> such
>> >> > > as
>> >> > > > the _change_flag_ like the other system columns for e.g
>> >> > > _hoodie_commit_time
>> >> > > > - what kind of table should keep these flags, in my thoughts, we
>> >> should
>> >> > > > only add these flags for "MERGE_ON_READ" table, and only for AVRO
>> >> logs
>> >> > > > - we should add a config there to switch on/off the flags in
>> system
>> >> > meta
>> >> > > > fields
>> >> > > >
>> >> > > > What do you think?
>> >> > > >
>> >> > > > Best,
>> >> > > > Danny Chan
>> >> > > >
>> >> > > > vino yang <yanghua1...@gmail.com> 于2021年4月1日周四 上午10:58写道:
>> >> > > >
>> >> > > >> >> Oops, the image crushes, for "change flags", i mean: insert,
>> >> > > >> update(before
>> >> > > >> and after) and delete.
>> >> > > >>
>> >> > > >> Yes, the image I attached is also about these flags.
>> >> > > >> [image: image (3).png]
>> >> > > >>
>> >> > > >> +1 for the idea.
>> >> > > >>
>> >> > > >> Best,
>> >> > > >> Vino
>> >> > > >>
>> >> > > >>
>> >> > > >> Danny Chan <danny0...@apache.org> 于2021年4月1日周四 上午10:03写道:
>> >> > > >>
>> >> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
>> >> > > >>> update(before
>> >> > > >>> and after) and delete.
>> >> > > >>>
>> >> > > >>> The Flink engine can propagate the change flags internally
>> between
>> >> > its
>> >> > > >>> operators, if HUDI can send the change flags to Flink, the
>> >> > incremental
>> >> > > >>> calculation of CDC would be very natural (almost transparent to
>> >> > users).
>> >> > > >>>
>> >> > > >>> Best,
>> >> > > >>> Danny Chan
>> >> > > >>>
>> >> > > >>> vino yang <yanghua1...@gmail.com> 于2021年3月31日周三 下午11:32写道:
>> >> > > >>>
>> >> > > >>> > Hi Danny,
>> >> > > >>> >
>> >> > > >>> > Thanks for kicking off this discussion thread.
>> >> > > >>> >
>> >> > > >>> > Yes, incremental query( or says "incremental processing") has
>> >> > always
>> >> > > >>> been
>> >> > > >>> > an important feature of the Hudi framework. If we can make
>> this
>> >> > > feature
>> >> > > >>> > better, it will be even more exciting.
>> >> > > >>> >
>> >> > > >>> > In the data warehouse, in some complex calculations, I have
>> not
>> >> > > found a
>> >> > > >>> > good way to conveniently use some incremental change data
>> >> (similar
>> >> > to
>> >> > > >>> the
>> >> > > >>> > concept of retracement stream in Flink?) to locally "correct"
>> >> the
>> >> > > >>> > aggregation result (these aggregation results may belong to
>> the
>> >> DWS
>> >> > > >>> layer).
>> >> > > >>> >
>> >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
>> >> (single
>> >> > > >>> table
>> >> > > >>> > or an algorithm that can be very easily retracement) can be
>> >> dealt
>> >> > > with
>> >> > > >>> > based on the incremental calculation of CDC.
>> >> > > >>> >
>> >> > > >>> > Of course, the expression of incremental calculation on
>> various
>> >> > > >>> occasions
>> >> > > >>> > is sometimes not very clear. Maybe we will discuss it more
>> >> clearly
>> >> > in
>> >> > > >>> > specific scenarios.
>> >> > > >>> >
>> >> > > >>> > >> If HUDI can keep and propagate these change flags to its
>> >> > > consumers,
>> >> > > >>> we
>> >> > > >>> > can
>> >> > > >>> > use HUDI as the unified format for the pipeline.
>> >> > > >>> >
>> >> > > >>> > Regarding the "change flags" here, do you mean the flags like
>> >> the
>> >> > one
>> >> > > >>> > shown in the figure below?
>> >> > > >>> >
>> >> > > >>> > [image: image.png]
>> >> > > >>> >
>> >> > > >>> > Best,
>> >> > > >>> > Vino
>> >> > > >>> >
>> >> > > >>> > Danny Chan <danny0...@apache.org> 于2021年3月31日周三 下午6:24写道:
>> >> > > >>> >
>> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss about
>> >> using
>> >> > > >>> HUDI as
>> >> > > >>> >> the unified storage/format for data warehouse/lake
>> incremental
>> >> > > >>> >> computation.
>> >> > > >>> >>
>> >> > > >>> >> Usually people divide data warehouse production into several
>> >> > levels,
>> >> > > >>> such
>> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
>> details),
>> >> > > >>> DWS(data
>> >> > > >>> >> warehouse service), ADS(application data service).
>> >> > > >>> >>
>> >> > > >>> >>
>> >> > > >>> >> ODS -> DWD -> DWS -> ADS
>> >> > > >>> >>
>> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation cases,
>> a
>> >> big
>> >> > > >>> topic is
>> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of RDBMS
>> >> into
>> >> > the
>> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
>> change
>> >> > > flag:
>> >> > > >>> >> insert, update(before and after) and delete for the
>> consumer,
>> >> with
>> >> > > >>> these
>> >> > > >>> >> flags, the downstream engines can have a realtime
>> accumulation
>> >> > > >>> >> computation.
>> >> > > >>> >>
>> >> > > >>> >> Using streaming engine like Flink, we can have a totally
>> >> > > >>> NEAR-REAL-TIME
>> >> > > >>> >> computation pipeline for each of the layer.
>> >> > > >>> >>
>> >> > > >>> >> If HUDI can keep and propagate these change flags to its
>> >> > consumers,
>> >> > > >>> we can
>> >> > > >>> >> use HUDI as the unified format for the pipeline.
>> >> > > >>> >>
>> >> > > >>> >> I'm expecting your nice ideas here ~
>> >> > > >>> >>
>> >> > > >>> >> Best,
>> >> > > >>> >> Danny Chan
>> >> > > >>> >>
>> >> > > >>> >
>> >> > > >>>
>> >> > > >>
>> >> > >
>> >> >
>> >>
>> >
>> >
>> > --
>> > Regards,
>> > -Sivabalan
>> >
>>
>>
>> --
>> Regards,
>> -Sivabalan
>>
>

Reply via email to