> Is it providing the ability to author continuous queries on
Hudi source tables end-end,
given Flink can use the flags to generate retract/upsert streams

Yes,that's the key point, with these flags plus flink stateful operators,
we can have a real time incremental ETL pipeline.

For example, a global aggregation that consumes cdc stream can do
acc/retract continuously and send the changes to downstream.
The ETL pipeline with cdc stream generates the same result as the batch
snapshot with the same sql query.

If keeping compatibility is a must with/without the new metadata columns, I
think there is no need to add a config option which brings in
unnecessary overhead. If we do not ensure backward compatibility for new
column, then we should add such a config option and by default
disable it.

Best,
Danny Chan


Vinoth Chandar <vin...@apache.org> 于2021年4月21日周三 上午6:30写道:

> Hi Danny,
>
> Read up on the Flink docs as well.
>
> If we don't actually publish data to the metacolumn, I think the overhead
> is pretty low w.r.t avro/parquet. Both are very good at encoding nulls.
> But, I feel it's worth adding a HoodieWriteConfig to control this and since
> addition of meta columns mostly happens in the handles,
> it may not be as bad ? Happy to suggest more concrete ideas on the PR.
>
> We still need to test backwards compatibility from different engines quite
> early though and make sure there are no surprises.
> Hive, Parquet, Spark, Presto all have their own rules on evolution as well.
> So we need to think this through if/how seamlessly this can be turned on
> for existing tables
>
> As for testing the new column, given Flink is what will be able to consume
> the flags? Can we write a quick unit test using Dynamic tables?
> I am also curious to understand how the flags help the end user ultimately?
> Reading the flink docs, I understand the concepts (coming from a Kafka
> streams world,
> most of it seem familiar), but what exact problem does the flag solve that
> exist today? Is it providing the ability to author continuous queries on
> Hudi source tables end-end,
> given Flink can use the flags to generate retract/upsert streams?
>
> For hard deletes, we still need to do some core work to make it available
> in the incremental query. So there's more to be done here for cracking this
> end-end streaming/continuous ETL vision?
>
> Very exciting stuff!
>
> Thanks
> Vinoth
>
>
>
>
>
> On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <danny0...@apache.org> wrote:
>
> > Hi, i have created a PR here:
> > https://github.com/apache/hudi/pull/2854/files
> >
> > In the PR i do these changes:
> > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a config
> > option because i can not find a good way to make the code clean, a
> metadata
> > column is very primitive and a config option would introduce too many
> > changes
> > 2. Modify the write handle to add the column: add operation for append
> > handle but merges the changes for create handle and merge handle
> > 3. the flag is only useful for streaming read, so i also merge the flags
> > for Flink batch reader, Flink streaming reader would emit each record
> with
> > the right cdc operation
> >
> > I did not change any Spark code because i'm not familiar with that, Spark
> > actually can not handle these flags in operators, So by default, the
> > column "_hoodie_cdc_operation", it has a value from the Flink writer.
> >
> > There may also need some unit tests for the new column in the hoodie
> core,
> > but i don't know how to, could you give some help ?
> >
> > Best,
> > Danny Chan
> >
> > Danny Chan <danny0...@apache.org> 于2021年4月19日周一 下午4:42写道:
> >
> > > Thanks @Sivabalan ~
> > >
> > > I agree that parquet and log files should keep sync in metadata columns
> > in
> > > case there are confusions
> > > and special handling in some use cases like compaction.
> > >
> > > I also agree add a metadata column is more ease to use for SQL
> > connectors.
> > >
> > > We can add a metadata column named "_hoodie_change_flag" and a config
> > > option to default disable this metadata column, what do you think?
> > >
> > > Best,
> > > Danny Chan
> > >
> > >
> > >
> > > Sivabalan <n.siv...@gmail.com> 于2021年4月17日周六 上午9:10写道:
> > >
> > >> wrt changes if we plan to add this only to log files, compaction needs
> > to
> > >> be fixed to omit this column to the minimum.
> > >>
> > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n.siv...@gmail.com> wrote:
> > >>
> > >> > Just got a chance to read about dynamic tables. sounds interesting.
> > >> >
> > >> > some thoughts on your questions:
> > >> > - yes, just MOR makes sense.
> > >> > - But adding this new meta column only to avro logs might incur some
> > non
> > >> > trivial changes. Since as of today, schema of avro and base files
> are
> > in
> > >> > sync. If this new col is going to store just 2
> > >> bits(insert/update/delete),
> > >> > we might as well add it to base files as well and keep it simple.
> But
> > we
> > >> > can make it configurable so that only those interested can enable
> this
> > >> new
> > >> > column to hudi dataset.
> > >> > - Wondering, even today we can achieve this by using a transformer,
> to
> > >> set
> > >> > right values to this new column. I mean, users need to add this col
> in
> > >> > their schema when defining the hudi dataset and if the incoming data
> > has
> > >> > right values for this col (using deltastreamer's transformer or by
> > >> explicit
> > >> > means), we don't even need to add this as a meta column. Just saying
> > >> that
> > >> > we can achieve this even today. But if we are looking to integrate
> w/
> > >> SQL
> > >> > DML, then adding this support would be elegant.
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <danny0...@apache.org>
> > >> wrote:
> > >> >
> > >> >> Thanks Vinoth ~
> > >> >>
> > >> >> Here is a document about the notion of 《Flink Dynamic Table》[1] ,
> > every
> > >> >> operator that has accumulate state can handle
> > >> retractions(UPDATE_BEFORE or
> > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so that
> each
> > >> >> operator can consume the CDC format messages in streaming way.
> > >> >>
> > >> >> > Another aspect to think about is, how the new flag can be added
> to
> > >> >> existing
> > >> >> tables and if the schema evolution would be fine.
> > >> >>
> > >> >> That is also my concern, but it's not that bad because adding a new
> > >> column
> > >> >> is still compatible for old schema in Avro.
> > >> >>
> > >> >> [1]
> > >> >>
> > >> >>
> > >>
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> > >> >>
> > >> >> Best,
> > >> >> Danny Chan
> > >> >>
> > >> >> Vinoth Chandar <vin...@apache.org> 于2021年4月16日周五 上午9:44写道:
> > >> >>
> > >> >> > Hi,
> > >> >> >
> > >> >> > Is the intent of the flag to convey if an insert delete or update
> > >> >> changed
> > >> >> > the record? If so I would imagine that we do this even for cow
> > >> tables,
> > >> >> > since that also supports a logical notion of a change stream
> using
> > >> the
> > >> >> > commit_time meta field.
> > >> >> >
> > >> >> > You may be right, but I am trying to understand the use case for
> > >> this.
> > >> >> Any
> > >> >> > links/flink docs I can read?
> > >> >> >
> > >> >> > Another aspect to think about is, how the new flag can be added
> to
> > >> >> existing
> > >> >> > tables and if the schema evolution would be fine.
> > >> >> >
> > >> >> > Thanks
> > >> >> > Vinoth
> > >> >> >
> > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <danny0...@apache.org>
> > >> wrote:
> > >> >> >
> > >> >> > > I tries to do a POC for flink locally and it works well, in the
> > PR
> > >> i
> > >> >> add
> > >> >> > a
> > >> >> > > new metadata column named "_hoodie_change_flag", but actually i
> > >> found
> > >> >> > that
> > >> >> > > only log format needs this flag, and the Spark may has no
> ability
> > >> to
> > >> >> > handle
> > >> >> > > the flag for incremental processing yet.
> > >> >> > >
> > >> >> > > So should i add the "_hoodie_change_flag" metadata column, or
> is
> > >> there
> > >> >> > any
> > >> >> > > better solution for this?
> > >> >> > >
> > >> >> > > Best,
> > >> >> > > Danny Chan
> > >> >> > >
> > >> >> > > Danny Chan <danny0...@apache.org> 于2021年4月2日周五 上午11:08写道:
> > >> >> > >
> > >> >> > > > Thanks cool, then the left questions are:
> > >> >> > > >
> > >> >> > > > - where we record these change, should we add a builtin meta
> > >> field
> > >> >> such
> > >> >> > > as
> > >> >> > > > the _change_flag_ like the other system columns for e.g
> > >> >> > > _hoodie_commit_time
> > >> >> > > > - what kind of table should keep these flags, in my thoughts,
> > we
> > >> >> should
> > >> >> > > > only add these flags for "MERGE_ON_READ" table, and only for
> > AVRO
> > >> >> logs
> > >> >> > > > - we should add a config there to switch on/off the flags in
> > >> system
> > >> >> > meta
> > >> >> > > > fields
> > >> >> > > >
> > >> >> > > > What do you think?
> > >> >> > > >
> > >> >> > > > Best,
> > >> >> > > > Danny Chan
> > >> >> > > >
> > >> >> > > > vino yang <yanghua1...@gmail.com> 于2021年4月1日周四 上午10:58写道:
> > >> >> > > >
> > >> >> > > >> >> Oops, the image crushes, for "change flags", i mean:
> > insert,
> > >> >> > > >> update(before
> > >> >> > > >> and after) and delete.
> > >> >> > > >>
> > >> >> > > >> Yes, the image I attached is also about these flags.
> > >> >> > > >> [image: image (3).png]
> > >> >> > > >>
> > >> >> > > >> +1 for the idea.
> > >> >> > > >>
> > >> >> > > >> Best,
> > >> >> > > >> Vino
> > >> >> > > >>
> > >> >> > > >>
> > >> >> > > >> Danny Chan <danny0...@apache.org> 于2021年4月1日周四 上午10:03写道:
> > >> >> > > >>
> > >> >> > > >>> Oops, the image crushes, for "change flags", i mean:
> insert,
> > >> >> > > >>> update(before
> > >> >> > > >>> and after) and delete.
> > >> >> > > >>>
> > >> >> > > >>> The Flink engine can propagate the change flags internally
> > >> between
> > >> >> > its
> > >> >> > > >>> operators, if HUDI can send the change flags to Flink, the
> > >> >> > incremental
> > >> >> > > >>> calculation of CDC would be very natural (almost
> transparent
> > to
> > >> >> > users).
> > >> >> > > >>>
> > >> >> > > >>> Best,
> > >> >> > > >>> Danny Chan
> > >> >> > > >>>
> > >> >> > > >>> vino yang <yanghua1...@gmail.com> 于2021年3月31日周三 下午11:32写道:
> > >> >> > > >>>
> > >> >> > > >>> > Hi Danny,
> > >> >> > > >>> >
> > >> >> > > >>> > Thanks for kicking off this discussion thread.
> > >> >> > > >>> >
> > >> >> > > >>> > Yes, incremental query( or says "incremental processing")
> > has
> > >> >> > always
> > >> >> > > >>> been
> > >> >> > > >>> > an important feature of the Hudi framework. If we can
> make
> > >> this
> > >> >> > > feature
> > >> >> > > >>> > better, it will be even more exciting.
> > >> >> > > >>> >
> > >> >> > > >>> > In the data warehouse, in some complex calculations, I
> have
> > >> not
> > >> >> > > found a
> > >> >> > > >>> > good way to conveniently use some incremental change data
> > >> >> (similar
> > >> >> > to
> > >> >> > > >>> the
> > >> >> > > >>> > concept of retracement stream in Flink?) to locally
> > "correct"
> > >> >> the
> > >> >> > > >>> > aggregation result (these aggregation results may belong
> to
> > >> the
> > >> >> DWS
> > >> >> > > >>> layer).
> > >> >> > > >>> >
> > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation
> scenarios
> > >> >> (single
> > >> >> > > >>> table
> > >> >> > > >>> > or an algorithm that can be very easily retracement) can
> be
> > >> >> dealt
> > >> >> > > with
> > >> >> > > >>> > based on the incremental calculation of CDC.
> > >> >> > > >>> >
> > >> >> > > >>> > Of course, the expression of incremental calculation on
> > >> various
> > >> >> > > >>> occasions
> > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss it
> more
> > >> >> clearly
> > >> >> > in
> > >> >> > > >>> > specific scenarios.
> > >> >> > > >>> >
> > >> >> > > >>> > >> If HUDI can keep and propagate these change flags to
> its
> > >> >> > > consumers,
> > >> >> > > >>> we
> > >> >> > > >>> > can
> > >> >> > > >>> > use HUDI as the unified format for the pipeline.
> > >> >> > > >>> >
> > >> >> > > >>> > Regarding the "change flags" here, do you mean the flags
> > like
> > >> >> the
> > >> >> > one
> > >> >> > > >>> > shown in the figure below?
> > >> >> > > >>> >
> > >> >> > > >>> > [image: image.png]
> > >> >> > > >>> >
> > >> >> > > >>> > Best,
> > >> >> > > >>> > Vino
> > >> >> > > >>> >
> > >> >> > > >>> > Danny Chan <danny0...@apache.org> 于2021年3月31日周三
> 下午6:24写道:
> > >> >> > > >>> >
> > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a discuss
> > about
> > >> >> using
> > >> >> > > >>> HUDI as
> > >> >> > > >>> >> the unified storage/format for data warehouse/lake
> > >> incremental
> > >> >> > > >>> >> computation.
> > >> >> > > >>> >>
> > >> >> > > >>> >> Usually people divide data warehouse production into
> > several
> > >> >> > levels,
> > >> >> > > >>> such
> > >> >> > > >>> >> as the ODS(operation data store), DWD(data warehouse
> > >> details),
> > >> >> > > >>> DWS(data
> > >> >> > > >>> >> warehouse service), ADS(application data service).
> > >> >> > > >>> >>
> > >> >> > > >>> >>
> > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> > >> >> > > >>> >>
> > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime) computation
> > cases,
> > >> a
> > >> >> big
> > >> >> > > >>> topic is
> > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds of
> > RDBMS
> > >> >> into
> > >> >> > the
> > >> >> > > >>> >> warehouse/lake, the cdc patten records and propagate the
> > >> change
> > >> >> > > flag:
> > >> >> > > >>> >> insert, update(before and after) and delete for the
> > >> consumer,
> > >> >> with
> > >> >> > > >>> these
> > >> >> > > >>> >> flags, the downstream engines can have a realtime
> > >> accumulation
> > >> >> > > >>> >> computation.
> > >> >> > > >>> >>
> > >> >> > > >>> >> Using streaming engine like Flink, we can have a totally
> > >> >> > > >>> NEAR-REAL-TIME
> > >> >> > > >>> >> computation pipeline for each of the layer.
> > >> >> > > >>> >>
> > >> >> > > >>> >> If HUDI can keep and propagate these change flags to its
> > >> >> > consumers,
> > >> >> > > >>> we can
> > >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> > >> >> > > >>> >>
> > >> >> > > >>> >> I'm expecting your nice ideas here ~
> > >> >> > > >>> >>
> > >> >> > > >>> >> Best,
> > >> >> > > >>> >> Danny Chan
> > >> >> > > >>> >>
> > >> >> > > >>> >
> > >> >> > > >>>
> > >> >> > > >>
> > >> >> > >
> > >> >> >
> > >> >>
> > >> >
> > >> >
> > >> > --
> > >> > Regards,
> > >> > -Sivabalan
> > >> >
> > >>
> > >>
> > >> --
> > >> Regards,
> > >> -Sivabalan
> > >>
> > >
> >
>

Reply via email to