Hi Danny,

Read up on the Flink docs as well.

If we don't actually publish data to the metacolumn, I think the overhead
is pretty low w.r.t avro/parquet. Both are very good at encoding nulls.
But, I feel it's worth adding a HoodieWriteConfig to control this and since
addition of meta columns mostly happens in the handles,
it may not be as bad ? Happy to suggest more concrete ideas on the PR.

We still need to test backwards compatibility from different engines quite
early though and make sure there are no surprises.
Hive, Parquet, Spark, Presto all have their own rules on evolution as well.
So we need to think this through if/how seamlessly this can be turned on
for existing tables

As for testing the new column, given Flink is what will be able to consume
the flags? Can we write a quick unit test using Dynamic tables?
I am also curious to understand how the flags help the end user ultimately?
Reading the flink docs, I understand the concepts (coming from a Kafka
streams world,
most of it seem familiar), but what exact problem does the flag solve that
exist today? Is it providing the ability to author continuous queries on
Hudi source tables end-end,
given Flink can use the flags to generate retract/upsert streams?

For hard deletes, we still need to do some core work to make it available
in the incremental query. So there's more to be done here for cracking this
end-end streaming/continuous ETL vision?

Very exciting stuff!

Thanks
Vinoth





On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <danny0...@apache.org> wrote:

> Hi, i have created a PR here:
> https://github.com/apache/hudi/pull/2854/files
>
> In the PR i do these changes:
> 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config
> option because i can not find a good way to make the code clean, a metadata
> column is very primitive and a config option would introduce too many
> changes
> 2. Modify the write handle to add the column: add operation for append
> handle but merges the changes for create handle and merge handle
> 3. the flag is only useful for streaming read, so i also merge the flags
> for Flink batch reader, Flink streaming reader would emit each record with
> the right cdc operation
>
> I did not change any Spark code because i'm not familiar with that, Spark
> actually can not handle these flags in operators, So by default, the
> column "_hoodie_cdc_operation", it has a value from the Flink writer.
>
> There may also need some unit tests for the new column in the hoodie core,
> but i don't know how to, could you give some help ?
>
> Best,
> Danny Chan
>
> Danny Chan <danny0...@apache.org> 于2021年4月19日周一 下午4:42写道:
>
> > Thanks @Sivabalan ~
> >
> > I agree that parquet and log files should keep sync in metadata columns
> in
> > case there are confusions
> > and special handling in some use cases like compaction.
> >
> > I also agree add a metadata column is more ease to use for SQL
> connectors.
> >
> > We can add a metadata column named "_hoodie_change_flag" and a config
> > option to default disable this metadata column, what do you think?
> >
> > Best,
> > Danny Chan
> >
> >
> >
> > Sivabalan <n.siv...@gmail.com> 于2021年4月17日周六 上午9:10写道:
> >
> >> wrt changes if we plan to add this only to log files, compaction needs
> to
> >> be fixed to omit this column to the minimum.
> >>
> >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n.siv...@gmail.com> wrote:
> >>
> >> > Just got a chance to read about dynamic tables. sounds interesting.
> >> >
> >> > some thoughts on your questions:
> >> > - yes, just MOR makes sense.
> >> > - But adding this new meta column only to avro logs might incur some
> non
> >> > trivial changes. Since as of today, schema of avro and base files are
> in
> >> > sync. If this new col is going to store just 2
> >> bits(insert/update/delete),
> >> > we might as well add it to base files as well and keep it simple. But
> we
> >> > can make it configurable so that only those interested can enable this
> >> new
> >> > column to hudi dataset.
> >> > - Wondering, even today we can achieve this by using a transformer, to
> >> set
> >> > right values to this new column. I mean, users need to add this col in
> >> > their schema when defining the hudi dataset and if the incoming data
> has
> >> > right values for this col (using deltastreamer's transformer or by
> >> explicit
> >> > means), we don't even need to add this as a meta column. Just saying
> >> that
> >> > we can achieve this even today. But if we are looking to integrate w/
> >> SQL
> >> > DML, then adding this support would be elegant.
> >> >
> >> >
> >> >
> >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <danny0...@apache.org>
> >> wrote:
> >> >
> >> >> Thanks Vinoth ~
> >> >>
> >> >> Here is a document about the notion of 《Flink Dynamic Table》[1] ,
> every
> >> >> operator that has accumulate state can handle
> >> retractions(UPDATE_BEFORE or
> >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that each
> >> >> operator can consume the CDC format messages in streaming way.
> >> >>
> >> >> > Another aspect to think about is, how the new flag can be added to
> >> >> existing
> >> >> tables and if the schema evolution would be fine.
> >> >>
> >> >> That is also my concern, but it's not that bad because adding a new
> >> column
> >> >> is still compatible for old schema in Avro.
> >> >>
> >> >> [1]
> >> >>
> >> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> >> >>
> >> >> Best,
> >> >> Danny Chan
> >> >>
> >> >> Vinoth Chandar <vin...@apache.org> 于2021年4月16日周五 上午9:44写道:
> >> >>
> >> >> > Hi,
> >> >> >
> >> >> > Is the intent of the flag to convey if an insert delete or update
> >> >> changed
> >> >> > the record? If so I would imagine that we do this even for cow
> >> tables,
> >> >> > since that also supports a logical notion of a change stream using
> >> the
> >> >> > commit_time meta field.
> >> >> >
> >> >> > You may be right, but I am trying to understand the use case for
> >> this.
> >> >> Any
> >> >> > links/flink docs I can read?
> >> >> >
> >> >> > Another aspect to think about is, how the new flag can be added to
> >> >> existing
> >> >> > tables and if the schema evolution would be fine.
> >> >> >
> >> >> > Thanks
> >> >> > Vinoth
> >> >> >
> >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <danny0...@apache.org>
> >> wrote:
> >> >> >
> >> >> > > I tries to do a POC for flink locally and it works well, in the
> PR
> >> i
> >> >> add
> >> >> > a
> >> >> > > new metadata column named "_hoodie_change_flag", but actually i
> >> found
> >> >> > that
> >> >> > > only log format needs this flag, and the Spark may has no ability
> >> to
> >> >> > handle
> >> >> > > the flag for incremental processing yet.
> >> >> > >
> >> >> > > So should i add the "_hoodie_change_flag" metadata column, or is
> >> there
> >> >> > any
> >> >> > > better solution for this?
> >> >> > >
> >> >> > > Best,
> >> >> > > Danny Chan
> >> >> > >
> >> >> > > Danny Chan <danny0...@apache.org> 于2021年4月2日周五 上午11:08写道:
> >> >> > >
> >> >> > > > Thanks cool, then the left questions are:
> >> >> > > >
> >> >> > > > - where we record these change, should we add a builtin meta
> >> field
> >> >> such
> >> >> > > as
> >> >> > > > the _change_flag_ like the other system columns for e.g
> >> >> > > _hoodie_commit_time
> >> >> > > > - what kind of table should keep these flags, in my thoughts,
> we
> >> >> should
> >> >> > > > only add these flags for "MERGE_ON_READ" table, and only for
> AVRO
> >> >> logs
> >> >> > > > - we should add a config there to switch on/off the flags in
> >> system
> >> >> > meta
> >> >> > > > fields
> >> >> > > >
> >> >> > > > What do you think?
> >> >> > > >
> >> >> > > > Best,
> >> >> > > > Danny Chan
> >> >> > > >
> >> >> > > > vino yang <yanghua1...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> >> >> > > >
> >> >> > > >> >> Oops, the image crushes, for "change flags", i mean:
> insert,
> >> >> > > >> update(before
> >> >> > > >> and after) and delete.
> >> >> > > >>
> >> >> > > >> Yes, the image I attached is also about these flags.
> >> >> > > >> [image: image (3).png]
> >> >> > > >>
> >> >> > > >> +1 for the idea.
> >> >> > > >>
> >> >> > > >> Best,
> >> >> > > >> Vino
> >> >> > > >>
> >> >> > > >>
> >> >> > > >> Danny Chan <danny0...@apache.org> 于2021年4月1日周四 上午10:03写道:
> >> >> > > >>
> >> >> > > >>> Oops, the image crushes, for "change flags", i mean: insert,
> >> >> > > >>> update(before
> >> >> > > >>> and after) and delete.
> >> >> > > >>>
> >> >> > > >>> The Flink engine can propagate the change flags internally
> >> between
> >> >> > its
> >> >> > > >>> operators, if HUDI can send the change flags to Flink, the
> >> >> > incremental
> >> >> > > >>> calculation of CDC would be very natural (almost transparent
> to
> >> >> > users).
> >> >> > > >>>
> >> >> > > >>> Best,
> >> >> > > >>> Danny Chan
> >> >> > > >>>
> >> >> > > >>> vino yang <yanghua1...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> >> >> > > >>>
> >> >> > > >>> > Hi Danny,
> >> >> > > >>> >
> >> >> > > >>> > Thanks for kicking off this discussion thread.
> >> >> > > >>> >
> >> >> > > >>> > Yes, incremental query( or says "incremental processing")
> has
> >> >> > always
> >> >> > > >>> been
> >> >> > > >>> > an important feature of the Hudi framework. If we can make
> >> this
> >> >> > > feature
> >> >> > > >>> > better, it will be even more exciting.
> >> >> > > >>> >
> >> >> > > >>> > In the data warehouse, in some complex calculations, I have
> >> not
> >> >> > > found a
> >> >> > > >>> > good way to conveniently use some incremental change data
> >> >> (similar
> >> >> > to
> >> >> > > >>> the
> >> >> > > >>> > concept of retracement stream in Flink?) to locally
> "correct"
> >> >> the
> >> >> > > >>> > aggregation result (these aggregation results may belong to
> >> the
> >> >> DWS
> >> >> > > >>> layer).
> >> >> > > >>> >
> >> >> > > >>> > BTW: Yes, I do admit that some simple calculation scenarios
> >> >> (single
> >> >> > > >>> table
> >> >> > > >>> > or an algorithm that can be very easily retracement) can be
> >> >> dealt
> >> >> > > with
> >> >> > > >>> > based on the incremental calculation of CDC.
> >> >> > > >>> >
> >> >> > > >>> > Of course, the expression of incremental calculation on
> >> various
> >> >> > > >>> occasions
> >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it more
> >> >> clearly
> >> >> > in
> >> >> > > >>> > specific scenarios.
> >> >> > > >>> >
> >> >> > > >>> > >> If HUDI can keep and propagate these change flags to its
> >> >> > > consumers,
> >> >> > > >>> we
> >> >> > > >>> > can
> >> >> > > >>> > use HUDI as the unified format for the pipeline.
> >> >> > > >>> >
> >> >> > > >>> > Regarding the "change flags" here, do you mean the flags
> like
> >> >> the
> >> >> > one
> >> >> > > >>> > shown in the figure below?
> >> >> > > >>> >
> >> >> > > >>> > [image: image.png]
> >> >> > > >>> >
> >> >> > > >>> > Best,
> >> >> > > >>> > Vino
> >> >> > > >>> >
> >> >> > > >>> > Danny Chan <danny0...@apache.org> 于2021年3月31日周三 下午6:24写道:
> >> >> > > >>> >
> >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss
> about
> >> >> using
> >> >> > > >>> HUDI as
> >> >> > > >>> >> the unified storage/format for data warehouse/lake
> >> incremental
> >> >> > > >>> >> computation.
> >> >> > > >>> >>
> >> >> > > >>> >> Usually people divide data warehouse production into
> several
> >> >> > levels,
> >> >> > > >>> such
> >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> >> details),
> >> >> > > >>> DWS(data
> >> >> > > >>> >> warehouse service), ADS(application data service).
> >> >> > > >>> >>
> >> >> > > >>> >>
> >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> >> >> > > >>> >>
> >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation
> cases,
> >> a
> >> >> big
> >> >> > > >>> topic is
> >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of
> RDBMS
> >> >> into
> >> >> > the
> >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
> >> change
> >> >> > > flag:
> >> >> > > >>> >> insert, update(before and after) and delete for the
> >> consumer,
> >> >> with
> >> >> > > >>> these
> >> >> > > >>> >> flags, the downstream engines can have a realtime
> >> accumulation
> >> >> > > >>> >> computation.
> >> >> > > >>> >>
> >> >> > > >>> >> Using streaming engine like Flink, we can have a totally
> >> >> > > >>> NEAR-REAL-TIME
> >> >> > > >>> >> computation pipeline for each of the layer.
> >> >> > > >>> >>
> >> >> > > >>> >> If HUDI can keep and propagate these change flags to its
> >> >> > consumers,
> >> >> > > >>> we can
> >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> >> >> > > >>> >>
> >> >> > > >>> >> I'm expecting your nice ideas here ~
> >> >> > > >>> >>
> >> >> > > >>> >> Best,
> >> >> > > >>> >> Danny Chan
> >> >> > > >>> >>
> >> >> > > >>> >
> >> >> > > >>>
> >> >> > > >>
> >> >> > >
> >> >> >
> >> >>
> >> >
> >> >
> >> > --
> >> > Regards,
> >> > -Sivabalan
> >> >
> >>
> >>
> >> --
> >> Regards,
> >> -Sivabalan
> >>
> >
>

Reply via email to