Hi Danny,

Thanks, I will review this asap. Already, in the "review in progress"
column.

Thanks
Vinoth

On Thu, Apr 22, 2021 at 12:49 AM Danny Chan <danny0...@apache.org> wrote:

> > Should we throw together a PoC/test code for an example Flink pipeline
> that
> will use hudi cdc flags + state ful operators?
>
> I have updated the pr https://github.com/apache/hudi/pull/2854,
>
> see the test case HoodieDataSourceITCase#testStreamReadWithDeletes.
>
> A data source:
>
> change_flag | uuid | name | age | ts | partition
>
> I, id1, Danny, 23, 1970-01-01T00:00:01, par1
> I, id2, Stephen, 33, 1970-01-01T00:00:02, par1
> I, id3, Julian, 53, 1970-01-01T00:00:03, par2
> I, id4, Fabian, 31, 1970-01-01T00:00:04, par2
> I, id5, Sophia, 18, 1970-01-01T00:00:05, par3
> I, id6, Emma, 20, 1970-01-01T00:00:06, par3
> I, id7, Bob, 44, 1970-01-01T00:00:07, par4
> I, id8, Han, 56, 1970-01-01T00:00:08, par4
> U, id1, Danny, 24, 1970-01-01T00:00:01, par1
> U, id2, Stephen, 34, 1970-01-01T00:00:02, par1
> I, id3, Julian, 53, 1970-01-01T00:00:03, par2
> D, id5, Sophia, 18, 1970-01-01T00:00:05, par3
> D, id9, Jane, 19, 1970-01-01T00:00:06, par3
>
> with streaming query "select name, sum(age) from t1 group by name" returns:
>
> change_flag | name | age_sum
> I, Danny, 24
> I Stephen, 34
>
> The result is the same as a batch snapshot query.
>
> Best,
> Danny Chan
>
> Vinoth Chandar <vin...@apache.org> 于2021年4月21日周三 下午1:32写道:
>
> > Keeping compatibility is a must. i.e users should be able to upgrade to
> the
> > new release with the _hoodie_cdc_flag meta column,
> > and be able to query new data (with this new meta col) alongside old data
> > (without this new meta col).
> > In fact, they should be able to downgrade back to previous versions (say
> > there is some other snag they hit), and go back to not writing this new
> > meta column.
> > if this is too hard, then a config to control is not a bad idea at-least
> > for an initial release?
> >
> > Thanks for clarifying the use-case! Makes total sense to me and look
> > forward to getting this going.
> > Should we throw together a PoC/test code for an example Flink pipeline
> that
> > will use hudi cdc flags + state ful operators?
> > It ll help us iron out gaps iteratively, finalize requirements - instead
> of
> > a more top-down, waterfall like model?
> >
> > On Tue, Apr 20, 2021 at 8:25 PM Danny Chan <danny0...@apache.org> wrote:
> >
> > > > Is it providing the ability to author continuous queries on
> > > Hudi source tables end-end,
> > > given Flink can use the flags to generate retract/upsert streams
> > >
> > > Yes,that's the key point, with these flags plus flink stateful
> operators,
> > > we can have a real time incremental ETL pipeline.
> > >
> > > For example, a global aggregation that consumes cdc stream can do
> > > acc/retract continuously and send the changes to downstream.
> > > The ETL pipeline with cdc stream generates the same result as the batch
> > > snapshot with the same sql query.
> > >
> > > If keeping compatibility is a must with/without the new metadata
> > columns, I
> > > think there is no need to add a config option which brings in
> > > unnecessary overhead. If we do not ensure backward compatibility for
> new
> > > column, then we should add such a config option and by default
> > > disable it.
> > >
> > > Best,
> > > Danny Chan
> > >
> > >
> > > Vinoth Chandar <vin...@apache.org> 于2021年4月21日周三 上午6:30写道:
> > >
> > > > Hi Danny,
> > > >
> > > > Read up on the Flink docs as well.
> > > >
> > > > If we don't actually publish data to the metacolumn, I think the
> > overhead
> > > > is pretty low w.r.t avro/parquet. Both are very good at encoding
> nulls.
> > > > But, I feel it's worth adding a HoodieWriteConfig to control this and
> > > since
> > > > addition of meta columns mostly happens in the handles,
> > > > it may not be as bad ? Happy to suggest more concrete ideas on the
> PR.
> > > >
> > > > We still need to test backwards compatibility from different engines
> > > quite
> > > > early though and make sure there are no surprises.
> > > > Hive, Parquet, Spark, Presto all have their own rules on evolution as
> > > well.
> > > > So we need to think this through if/how seamlessly this can be turned
> > on
> > > > for existing tables
> > > >
> > > > As for testing the new column, given Flink is what will be able to
> > > consume
> > > > the flags? Can we write a quick unit test using Dynamic tables?
> > > > I am also curious to understand how the flags help the end user
> > > ultimately?
> > > > Reading the flink docs, I understand the concepts (coming from a
> Kafka
> > > > streams world,
> > > > most of it seem familiar), but what exact problem does the flag solve
> > > that
> > > > exist today? Is it providing the ability to author continuous queries
> > on
> > > > Hudi source tables end-end,
> > > > given Flink can use the flags to generate retract/upsert streams?
> > > >
> > > > For hard deletes, we still need to do some core work to make it
> > available
> > > > in the incremental query. So there's more to be done here for
> cracking
> > > this
> > > > end-end streaming/continuous ETL vision?
> > > >
> > > > Very exciting stuff!
> > > >
> > > > Thanks
> > > > Vinoth
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Apr 20, 2021 at 2:23 AM Danny Chan <danny0...@apache.org>
> > wrote:
> > > >
> > > > > Hi, i have created a PR here:
> > > > > https://github.com/apache/hudi/pull/2854/files
> > > > >
> > > > > In the PR i do these changes:
> > > > > 1. Add a metadata column: "_hoodie_cdc_operation", i did not add a
> > > config
> > > > > option because i can not find a good way to make the code clean, a
> > > > metadata
> > > > > column is very primitive and a config option would introduce too
> many
> > > > > changes
> > > > > 2. Modify the write handle to add the column: add operation for
> > append
> > > > > handle but merges the changes for create handle and merge handle
> > > > > 3. the flag is only useful for streaming read, so i also merge the
> > > flags
> > > > > for Flink batch reader, Flink streaming reader would emit each
> record
> > > > with
> > > > > the right cdc operation
> > > > >
> > > > > I did not change any Spark code because i'm not familiar with that,
> > > Spark
> > > > > actually can not handle these flags in operators, So by default,
> the
> > > > > column "_hoodie_cdc_operation", it has a value from the Flink
> writer.
> > > > >
> > > > > There may also need some unit tests for the new column in the
> hoodie
> > > > core,
> > > > > but i don't know how to, could you give some help ?
> > > > >
> > > > > Best,
> > > > > Danny Chan
> > > > >
> > > > > Danny Chan <danny0...@apache.org> 于2021年4月19日周一 下午4:42写道:
> > > > >
> > > > > > Thanks @Sivabalan ~
> > > > > >
> > > > > > I agree that parquet and log files should keep sync in metadata
> > > columns
> > > > > in
> > > > > > case there are confusions
> > > > > > and special handling in some use cases like compaction.
> > > > > >
> > > > > > I also agree add a metadata column is more ease to use for SQL
> > > > > connectors.
> > > > > >
> > > > > > We can add a metadata column named "_hoodie_change_flag" and a
> > config
> > > > > > option to default disable this metadata column, what do you
> think?
> > > > > >
> > > > > > Best,
> > > > > > Danny Chan
> > > > > >
> > > > > >
> > > > > >
> > > > > > Sivabalan <n.siv...@gmail.com> 于2021年4月17日周六 上午9:10写道:
> > > > > >
> > > > > >> wrt changes if we plan to add this only to log files, compaction
> > > needs
> > > > > to
> > > > > >> be fixed to omit this column to the minimum.
> > > > > >>
> > > > > >> On Fri, Apr 16, 2021 at 9:07 PM Sivabalan <n.siv...@gmail.com>
> > > wrote:
> > > > > >>
> > > > > >> > Just got a chance to read about dynamic tables. sounds
> > > interesting.
> > > > > >> >
> > > > > >> > some thoughts on your questions:
> > > > > >> > - yes, just MOR makes sense.
> > > > > >> > - But adding this new meta column only to avro logs might
> incur
> > > some
> > > > > non
> > > > > >> > trivial changes. Since as of today, schema of avro and base
> > files
> > > > are
> > > > > in
> > > > > >> > sync. If this new col is going to store just 2
> > > > > >> bits(insert/update/delete),
> > > > > >> > we might as well add it to base files as well and keep it
> > simple.
> > > > But
> > > > > we
> > > > > >> > can make it configurable so that only those interested can
> > enable
> > > > this
> > > > > >> new
> > > > > >> > column to hudi dataset.
> > > > > >> > - Wondering, even today we can achieve this by using a
> > > transformer,
> > > > to
> > > > > >> set
> > > > > >> > right values to this new column. I mean, users need to add
> this
> > > col
> > > > in
> > > > > >> > their schema when defining the hudi dataset and if the
> incoming
> > > data
> > > > > has
> > > > > >> > right values for this col (using deltastreamer's transformer
> or
> > by
> > > > > >> explicit
> > > > > >> > means), we don't even need to add this as a meta column. Just
> > > saying
> > > > > >> that
> > > > > >> > we can achieve this even today. But if we are looking to
> > integrate
> > > > w/
> > > > > >> SQL
> > > > > >> > DML, then adding this support would be elegant.
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > On Thu, Apr 15, 2021 at 11:33 PM Danny Chan <
> > danny0...@apache.org
> > > >
> > > > > >> wrote:
> > > > > >> >
> > > > > >> >> Thanks Vinoth ~
> > > > > >> >>
> > > > > >> >> Here is a document about the notion of 《Flink Dynamic
> > Table》[1] ,
> > > > > every
> > > > > >> >> operator that has accumulate state can handle
> > > > > >> retractions(UPDATE_BEFORE or
> > > > > >> >> DELETE) then apply new changes (INSERT or UPDATE_AFTER), so
> > that
> > > > each
> > > > > >> >> operator can consume the CDC format messages in streaming
> way.
> > > > > >> >>
> > > > > >> >> > Another aspect to think about is, how the new flag can be
> > added
> > > > to
> > > > > >> >> existing
> > > > > >> >> tables and if the schema evolution would be fine.
> > > > > >> >>
> > > > > >> >> That is also my concern, but it's not that bad because
> adding a
> > > new
> > > > > >> column
> > > > > >> >> is still compatible for old schema in Avro.
> > > > > >> >>
> > > > > >> >> [1]
> > > > > >> >>
> > > > > >> >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html
> > > > > >> >>
> > > > > >> >> Best,
> > > > > >> >> Danny Chan
> > > > > >> >>
> > > > > >> >> Vinoth Chandar <vin...@apache.org> 于2021年4月16日周五 上午9:44写道:
> > > > > >> >>
> > > > > >> >> > Hi,
> > > > > >> >> >
> > > > > >> >> > Is the intent of the flag to convey if an insert delete or
> > > update
> > > > > >> >> changed
> > > > > >> >> > the record? If so I would imagine that we do this even for
> > cow
> > > > > >> tables,
> > > > > >> >> > since that also supports a logical notion of a change
> stream
> > > > using
> > > > > >> the
> > > > > >> >> > commit_time meta field.
> > > > > >> >> >
> > > > > >> >> > You may be right, but I am trying to understand the use
> case
> > > for
> > > > > >> this.
> > > > > >> >> Any
> > > > > >> >> > links/flink docs I can read?
> > > > > >> >> >
> > > > > >> >> > Another aspect to think about is, how the new flag can be
> > added
> > > > to
> > > > > >> >> existing
> > > > > >> >> > tables and if the schema evolution would be fine.
> > > > > >> >> >
> > > > > >> >> > Thanks
> > > > > >> >> > Vinoth
> > > > > >> >> >
> > > > > >> >> > On Thu, Apr 8, 2021 at 2:13 AM Danny Chan <
> > > danny0...@apache.org>
> > > > > >> wrote:
> > > > > >> >> >
> > > > > >> >> > > I tries to do a POC for flink locally and it works well,
> in
> > > the
> > > > > PR
> > > > > >> i
> > > > > >> >> add
> > > > > >> >> > a
> > > > > >> >> > > new metadata column named "_hoodie_change_flag", but
> > > actually i
> > > > > >> found
> > > > > >> >> > that
> > > > > >> >> > > only log format needs this flag, and the Spark may has no
> > > > ability
> > > > > >> to
> > > > > >> >> > handle
> > > > > >> >> > > the flag for incremental processing yet.
> > > > > >> >> > >
> > > > > >> >> > > So should i add the "_hoodie_change_flag" metadata
> column,
> > or
> > > > is
> > > > > >> there
> > > > > >> >> > any
> > > > > >> >> > > better solution for this?
> > > > > >> >> > >
> > > > > >> >> > > Best,
> > > > > >> >> > > Danny Chan
> > > > > >> >> > >
> > > > > >> >> > > Danny Chan <danny0...@apache.org> 于2021年4月2日周五
> 上午11:08写道:
> > > > > >> >> > >
> > > > > >> >> > > > Thanks cool, then the left questions are:
> > > > > >> >> > > >
> > > > > >> >> > > > - where we record these change, should we add a builtin
> > > meta
> > > > > >> field
> > > > > >> >> such
> > > > > >> >> > > as
> > > > > >> >> > > > the _change_flag_ like the other system columns for e.g
> > > > > >> >> > > _hoodie_commit_time
> > > > > >> >> > > > - what kind of table should keep these flags, in my
> > > thoughts,
> > > > > we
> > > > > >> >> should
> > > > > >> >> > > > only add these flags for "MERGE_ON_READ" table, and
> only
> > > for
> > > > > AVRO
> > > > > >> >> logs
> > > > > >> >> > > > - we should add a config there to switch on/off the
> flags
> > > in
> > > > > >> system
> > > > > >> >> > meta
> > > > > >> >> > > > fields
> > > > > >> >> > > >
> > > > > >> >> > > > What do you think?
> > > > > >> >> > > >
> > > > > >> >> > > > Best,
> > > > > >> >> > > > Danny Chan
> > > > > >> >> > > >
> > > > > >> >> > > > vino yang <yanghua1...@gmail.com> 于2021年4月1日周四
> > 上午10:58写道:
> > > > > >> >> > > >
> > > > > >> >> > > >> >> Oops, the image crushes, for "change flags", i
> mean:
> > > > > insert,
> > > > > >> >> > > >> update(before
> > > > > >> >> > > >> and after) and delete.
> > > > > >> >> > > >>
> > > > > >> >> > > >> Yes, the image I attached is also about these flags.
> > > > > >> >> > > >> [image: image (3).png]
> > > > > >> >> > > >>
> > > > > >> >> > > >> +1 for the idea.
> > > > > >> >> > > >>
> > > > > >> >> > > >> Best,
> > > > > >> >> > > >> Vino
> > > > > >> >> > > >>
> > > > > >> >> > > >>
> > > > > >> >> > > >> Danny Chan <danny0...@apache.org> 于2021年4月1日周四
> > 上午10:03写道:
> > > > > >> >> > > >>
> > > > > >> >> > > >>> Oops, the image crushes, for "change flags", i mean:
> > > > insert,
> > > > > >> >> > > >>> update(before
> > > > > >> >> > > >>> and after) and delete.
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> The Flink engine can propagate the change flags
> > > internally
> > > > > >> between
> > > > > >> >> > its
> > > > > >> >> > > >>> operators, if HUDI can send the change flags to
> Flink,
> > > the
> > > > > >> >> > incremental
> > > > > >> >> > > >>> calculation of CDC would be very natural (almost
> > > > transparent
> > > > > to
> > > > > >> >> > users).
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> Best,
> > > > > >> >> > > >>> Danny Chan
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> vino yang <yanghua1...@gmail.com> 于2021年3月31日周三
> > > 下午11:32写道:
> > > > > >> >> > > >>>
> > > > > >> >> > > >>> > Hi Danny,
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Thanks for kicking off this discussion thread.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Yes, incremental query( or says "incremental
> > > processing")
> > > > > has
> > > > > >> >> > always
> > > > > >> >> > > >>> been
> > > > > >> >> > > >>> > an important feature of the Hudi framework. If we
> can
> > > > make
> > > > > >> this
> > > > > >> >> > > feature
> > > > > >> >> > > >>> > better, it will be even more exciting.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > In the data warehouse, in some complex
> calculations,
> > I
> > > > have
> > > > > >> not
> > > > > >> >> > > found a
> > > > > >> >> > > >>> > good way to conveniently use some incremental
> change
> > > data
> > > > > >> >> (similar
> > > > > >> >> > to
> > > > > >> >> > > >>> the
> > > > > >> >> > > >>> > concept of retracement stream in Flink?) to locally
> > > > > "correct"
> > > > > >> >> the
> > > > > >> >> > > >>> > aggregation result (these aggregation results may
> > > belong
> > > > to
> > > > > >> the
> > > > > >> >> DWS
> > > > > >> >> > > >>> layer).
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > BTW: Yes, I do admit that some simple calculation
> > > > scenarios
> > > > > >> >> (single
> > > > > >> >> > > >>> table
> > > > > >> >> > > >>> > or an algorithm that can be very easily
> retracement)
> > > can
> > > > be
> > > > > >> >> dealt
> > > > > >> >> > > with
> > > > > >> >> > > >>> > based on the incremental calculation of CDC.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Of course, the expression of incremental
> calculation
> > on
> > > > > >> various
> > > > > >> >> > > >>> occasions
> > > > > >> >> > > >>> > is sometimes not very clear. Maybe we will discuss
> it
> > > > more
> > > > > >> >> clearly
> > > > > >> >> > in
> > > > > >> >> > > >>> > specific scenarios.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > >> If HUDI can keep and propagate these change
> flags
> > to
> > > > its
> > > > > >> >> > > consumers,
> > > > > >> >> > > >>> we
> > > > > >> >> > > >>> > can
> > > > > >> >> > > >>> > use HUDI as the unified format for the pipeline.
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Regarding the "change flags" here, do you mean the
> > > flags
> > > > > like
> > > > > >> >> the
> > > > > >> >> > one
> > > > > >> >> > > >>> > shown in the figure below?
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > [image: image.png]
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Best,
> > > > > >> >> > > >>> > Vino
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> > Danny Chan <danny0...@apache.org> 于2021年3月31日周三
> > > > 下午6:24写道:
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>> >> Hi dear HUDI community ~ Here i want to fire a
> > discuss
> > > > > about
> > > > > >> >> using
> > > > > >> >> > > >>> HUDI as
> > > > > >> >> > > >>> >> the unified storage/format for data warehouse/lake
> > > > > >> incremental
> > > > > >> >> > > >>> >> computation.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> Usually people divide data warehouse production
> into
> > > > > several
> > > > > >> >> > levels,
> > > > > >> >> > > >>> such
> > > > > >> >> > > >>> >> as the ODS(operation data store), DWD(data
> warehouse
> > > > > >> details),
> > > > > >> >> > > >>> DWS(data
> > > > > >> >> > > >>> >> warehouse service), ADS(application data service).
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> ODS -> DWD -> DWS -> ADS
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> In the NEAR-REAL-TIME (or pure realtime)
> computation
> > > > > cases,
> > > > > >> a
> > > > > >> >> big
> > > > > >> >> > > >>> topic is
> > > > > >> >> > > >>> >> syncing the change log(CDC pattern) from all kinds
> > of
> > > > > RDBMS
> > > > > >> >> into
> > > > > >> >> > the
> > > > > >> >> > > >>> >> warehouse/lake, the cdc patten records and
> propagate
> > > the
> > > > > >> change
> > > > > >> >> > > flag:
> > > > > >> >> > > >>> >> insert, update(before and after) and delete for
> the
> > > > > >> consumer,
> > > > > >> >> with
> > > > > >> >> > > >>> these
> > > > > >> >> > > >>> >> flags, the downstream engines can have a realtime
> > > > > >> accumulation
> > > > > >> >> > > >>> >> computation.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> Using streaming engine like Flink, we can have a
> > > totally
> > > > > >> >> > > >>> NEAR-REAL-TIME
> > > > > >> >> > > >>> >> computation pipeline for each of the layer.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> If HUDI can keep and propagate these change flags
> to
> > > its
> > > > > >> >> > consumers,
> > > > > >> >> > > >>> we can
> > > > > >> >> > > >>> >> use HUDI as the unified format for the pipeline.
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> I'm expecting your nice ideas here ~
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >> Best,
> > > > > >> >> > > >>> >> Danny Chan
> > > > > >> >> > > >>> >>
> > > > > >> >> > > >>> >
> > > > > >> >> > > >>>
> > > > > >> >> > > >>
> > > > > >> >> > >
> > > > > >> >> >
> > > > > >> >>
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > Regards,
> > > > > >> > -Sivabalan
> > > > > >> >
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Regards,
> > > > > >> -Sivabalan
> > > > > >>
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to