> So, you are trying to avoid reading the again from an incremental query?
If
so, I don't know how we can achieve this in Hudi.
Let's say we

a) read the 20 mins of data from Kafka or DFS, into a Spark Dataframe,
b) issue an upsert into a hudi table (at this point the dataframe is lost)

if you want to do more processing, after step b, then we need to just cache
the input data frame, right?

IMO, In the beginning, Let's align this question.

Why we cannot reuse the result of those public written API:


*  public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records,
final String instantTime);*

so that we can add more processing nodes into the current DAG of the Spark
Job in one write cycle?

We do not need to cache the input data frame, because we can use
transformation before committing.
We try to reuse the written data.


Vinoth Chandar <[email protected]> 于2020年9月15日周二 上午5:32写道:

> >I mean we can do more things in one pipeline based on the capabilities
> of the computing frameworks.
> >But the key difference, we can avoid re-reading these data from the
> persistent storage again.
>
> So, you are trying to avoid reading the again from an incremental query? If
> so, I don't know how we can achieve this in Hudi.
> Let's say we
>
> a) read the 20 mins of data from Kafka or DFS, into a Spark Dataframe,
> b) issue an upsert into a hudi table (at this point the dataframe is lost)
>
> if you want to do more processing, after step b, then we need to just cache
> the input data frame, right?
>
> Please don't get me wrong. I want us to brainstorm more on the incremental
> pipeline scenarios, but not connecting how someone
> would benefit from the proposal against what they are using today. May be
> if we can structure this discussion along lines of
>
> 1) What is the problem?
> 2) How are users solving this today?
> 3) How do we want to solve this tomorrow? why?
>
> without even going into implementation or framework design, that can help
> us?
>
>
>
>
> On Wed, Sep 9, 2020 at 7:52 PM vino yang <[email protected]> wrote:
>
> > >Given how much effort it takes for Beam to solve this, IMHO we cannot do
> > justice to such an undertaking. We should just use Beam :)
> >
> > My original idea is that we only support a very limited processing
> fashion.
> > Agree with you, the decision is very important. It
> > Besides Beam, The transport lib[1] contributed by LinkedIn also provided
> an
> > abstraction layer beyond computing frameworks.
> > But, yes, Beam is a better choice.
> >
> > > Sorry, I still cannot understand the uniqueness of the requirement
> here.
> > Windowing/aggregation again just sounds like another spark job to me,
> which
> > is crunching the ingested data.
> > It can incrementally query the source hudi tables already.
> >
> > I mean we can do more things in one pipeline based on the capabilities of
> > the computing frameworks.
> > The "Window" concept here, is not the same semantics in the computing
> > frameworks.
> > It's a more generalized and coarse-grained concept or simply says:
> >
> > *It's a batch of the sync interval of the DeltaStreamer in the continuous
> > mode.*
> >
> > For example, 20 mins batch data, half-hour batch data. We need some
> > capabilities to do some aggregation on such a "window" after the data
> > landing with the ACID semantic.
> > Yes, through the scheduler engine like Apache Airflow, we can read these
> > data from the storage then process them. But the key difference, we can
> > avoid re-reading these data from the persistent storage again.
> >
> > [1] https://github.com/linkedin/transport
> >
> > Best,
> > Vino
> >
> > Vinoth Chandar <[email protected]> 于2020年9月10日周四 上午5:29写道:
> >
> > > Apologies. Long weekend, and lost track of this :/
> > >
> > > >We must touch the field that Beam trying to solve.
> > > Given how much effort it takes for Beam to solve this, IMHO we cannot
> do
> > > justice to such an undertaking. We should just use Beam :)
> > >
> > > >I just want to introduce a utility class (or a utility module/library
> if
> > > we try to define some engine-independent transformation class)
> > > > However, there are some scenarios that are not about table
> processing.
> > > e.g. metrics calculating, quickly aggregate and calculate windows
> > >
> > > Sorry, I still cannot understand the uniqueness of the requirement
> here.
> > > Windowing/aggregation again just sounds like another spark job to me,
> > which
> > > is crunching the ingested data.
> > > It can incrementally query the source hudi tables already.
> > >
> > > My 2c is that we have lot higher value in integrating well with Apache
> > > Airflow/DolphinScheduler, to trigger dependent jobs. By and large,
> that's
> > > how people write batch jobs today.
> > > We have already built these commit callback mechanisms for e.g.
> > >
> > >
> > >
> > >
> > > On Tue, Sep 1, 2020 at 8:23 PM vino yang <[email protected]>
> wrote:
> > >
> > > > Hi vc,
> > > >
> > > > Thanks for your feedback.
> > > >
> > > > > Today, if I am a Spark developer, I can write a little program to
> do
> > a
> > > > Hudi
> > > > upsert and then trigger some other transformation conditionally based
> > on
> > > > whether upsert/insert happened, right?
> > > >
> > > > Yes, technically it can be implemented.
> > > >
> > > > > and I could do that without losing any of the existing
> transformation
> > > > methods I know in Spark.
> > > >
> > > > Yes
> > > >
> > > > > I am not quite clear on how much value this
> > > > library adds on top and in fact, bit concerned
> > > > that we set ourselves up for solving engine-independent problems that
> > > > Apache Beam for e.g has already solved.
> > > >
> > > > Introducing these APIs can bring more fluent API usage experience
> > without
> > > > using chained program as you mentioned above. We can directly define
> > the
> > > > logic of how to process the committed data after they land to the fs.
> > > >
> > > > Yes, if hudi's goal is an engine-independent data lake library. And
> if
> > we
> > > > want
> > > > to introduce some abilities of transformation. We must touch the
> field
> > > that
> > > > Beam trying to solve.
> > > >
> > > > > I also have doubts on whether coupling the incremental processing
> > after
> > > > commit into a single process itself is desirable.
> > > >
> > > > Actually, I just want to introduce a utility class (or a utility
> > > > module/library if
> > > > we try to define some engine-independent transformation class) that
> > gives
> > > > users another choice to build a data processing pipeline, not only a
> > data
> > > > ingesting library.
> > > >
> > > > The current functions stay the same, e.g. reading and writing are
> > > > completely decoupled.
> > > >
> > > > And I am thinking if it's a good idea naming it's an incremental
> > > processing
> > > > relevant proposal.
> > > > Based on it just try to process the recently commit, it could only
> > > provide
> > > > a limited function
> > > > compared with the current provided incremental view.
> > > >
> > > > Maybe it would be better to define it to be a pipeline relevant
> > proposal?
> > > > Linking ingestion and processing?
> > > >
> > > > > Typical scenarios I have
> > > > seen, job A ingests data into table A, job B
> > > > incrementally queries table A and kicks another ETL to build table B.
> > > Job A
> > > > and B are typically different and written by different developers.
> > > > If you could help me understand the use-case, that would be awesome.
> > > >
> > > > Yes, the general scenarios are like what you said, especially, data
> > > > warehouse.
> > > > However, there are some scenarios that are not about table
> processing.
> > > e.g.
> > > > metrics calculating,
> > > > quickly aggregate and calculate windows (If we fix the commit cycle,
> > in a
> > > > sense, it is a fixed window
> > > > based on processing-time semantics.) and so on.
> > > >
> > > > After the data has landed, it can be processed quickly without
> > > re-reading.
> > > > It's a key advantage.
> > > >
> > > > > All that said, there are pains around "triggering" job B
> (downstream
> > > > computations incrementally) and we could solve that by for e.g
> > supporting
> > > > an Apache Airflow operator that can trigger workflows
> > > > when commits arrive on its upstream tables. What I am trying to say
> is
> > -
> > > > there is definitely gaps we would like to improve upon to make
> > > incremental
> > > > processing mainstream, not sure if the proposed
> > > > APIs are the highest on that list.
> > > >
> > > > Yes, Airflow can solve the triggering problem. We are using another
> > > > scheduler framework: Apache Dolphinscheduler[1]
> > > > that can also trigger hudi's incremental processing too.
> > > >
> > > > The key difference is the performance and effective triggering,
> right?
> > > > This proposal tries to add more transformation into the same Spark's
> > DAG
> > > of
> > > > the original data ingestion.
> > > >
> > > > So, in short, this proposal tries to bring something:
> > > >
> > > >    - performance: better performance when processing after data
> > > ingestion;
> > > >    - focus and fluent: inline ingestion and processing logic in some
> > > >    scenarios;
> > > >    - boundary: at a high level, introduce more ability of computing
> > > engine
> > > >    directly, we have depends on the computing engine right? why not
> > > release
> > > >    more?
> > > >
> > > > Of course, as you said, we will face some problems, such as the
> problem
> > > of
> > > > providing abstraction on multiple computing engines.
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > [1]: https://dolphinscheduler.apache.org/
> > > >
> > > >
> > > >
> > > > Vinoth Chandar <[email protected]> 于2020年9月2日周三 上午12:13写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > While I agree on bringing more of these capabilities to Hudi
> > natively,
> > > I
> > > > > have few questions/concerns on the specific approach.
> > > > >
> > > > > > And these calculation functions should be engine independent.
> > > > Therefore,
> > > > > I plan to introduce some new APIs that allow users to directly
> define
> > > > >
> > > > > Today, if I am a Spark developer, I can write a little program to
> do
> > a
> > > > Hudi
> > > > > upsert and then trigger some other transformation conditionally
> based
> > > on
> > > > > whether upsert/insert happened, right?
> > > > > and I could do that without losing any of the existing
> transformation
> > > > > methods I know in Spark. I am not quite clear on how much value
> this
> > > > > library adds on top and in fact, bit concerned
> > > > > that we set ourselves up for solving engine-independent problems
> that
> > > > > Apache Beam for e.g has already solved.
> > > > >
> > > > > I also have doubts on whether coupling the incremental processing
> > after
> > > > > commit into a single process itself is desirable. Typical
> scenarios I
> > > > have
> > > > > seen, job A ingests data into table A, job B
> > > > > incrementally queries table A and kicks another ETL to build table
> B.
> > > > Job A
> > > > > and B are typically different and written by different developers.
> > > > > If you could help me understand the use-case, that would be
> awesome.
> > > > >
> > > > > All that said, there are pains around "triggering" job B
> (downstream
> > > > > computations incrementally) and we could solve that by for e.g
> > > supporting
> > > > > an Apache Airflow operator that can trigger workflows
> > > > > when commits arrive on its upstream tables. What I am trying to say
> > is
> > > -
> > > > > there is definitely gaps we would like to improve upon to make
> > > > incremental
> > > > > processing mainstream, not sure if the proposed
> > > > > APIs are the highest on that list.
> > > > >
> > > > > Apologies if I am missing something. Please help me understand if
> so.
> > > > >
> > > > > Thanks
> > > > > Vinoth
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Sep 1, 2020 at 4:26 AM vino yang <[email protected]>
> > > wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > Does anyone have ideas or disagreements?
> > > > > >
> > > > > > I think the introduction of these APIs will greatly enhance
> Hudi's
> > > data
> > > > > > processing capabilities and eliminate the performance overhead of
> > > > reading
> > > > > > data for processing after writing.
> > > > > >
> > > > > > Best,
> > > > > > Vino
> > > > > >
> > > > > > wangxianghu <[email protected]> 于2020年8月31日周一 下午3:44写道:
> > > > > >
> > > > > > > +1
> > > > > > > This will give hudi more capabilities besides data ingestion
> and
> > > > > writing,
> > > > > > > and make hudi-based data processing more timely!
> > > > > > > Best,
> > > > > > > wangxianghu
> > > > > > >
> > > > > > > 发件人: Abhishek Modi
> > > > > > > 发送时间: 2020年8月31日 15:01
> > > > > > > 收件人: [email protected]
> > > > > > > 主题: Re: [DISCUSS] Introduce incremental processing API in Hudi
> > > > > > >
> > > > > > > +1
> > > > > > >
> > > > > > > This sounds really interesting! I like that this implicitly
> gives
> > > > Hudi
> > > > > > the
> > > > > > > ability to do transformations on ingested data :)
> > > > > > >
> > > > > > > On Sun, Aug 30, 2020 at 10:59 PM vino yang <
> [email protected]>
> > > > > wrote:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > >
> > > > > > > > For a long time, in the field of big data, people hope that
> the
> > > > tools
> > > > > > > they
> > > > > > > > use can give greater play to the processing and analysis
> > > > capabilities
> > > > > > of
> > > > > > > > big data. At present, from the perspective of API, Hudi
> mostly
> > > > > provides
> > > > > > > > APIs related to data ingestion, and relies on various big
> data
> > > > query
> > > > > > > > engines on the query side to release capabilities, but does
> not
> > > > > > provide a
> > > > > > > > more convenient API for data processing after transactional
> > > > writing.
> > > > > > > >
> > > > > > > > Currently, if a user wants to process the incremental data
> of a
> > > > > commit
> > > > > > > that
> > > > > > > > has just recently taken. It needs to go through three steps:
> > > > > > > >
> > > > > > > >
> > > > > > > >    1.
> > > > > > > >
> > > > > > > >    Write data to a hudi table;
> > > > > > > >    2.
> > > > > > > >
> > > > > > > >    Query or check completion of commit;
> > > > > > > >    3.
> > > > > > > >
> > > > > > > >    After the data is committed, the data is found out through
> > > > > > incremental
> > > > > > > >    query, and then the data is processed;
> > > > > > > >
> > > > > > > >
> > > > > > > > If you want a quick link here, you may use Hudi's recent
> > written
> > > > > commit
> > > > > > > > callback function to simplify it into two steps:
> > > > > > > >
> > > > > > > >
> > > > > > > >    1.
> > > > > > > >
> > > > > > > >    Write data to a hudi table;
> > > > > > > >    2.
> > > > > > > >
> > > > > > > >    Based on the written commit callback function to trigger
> an
> > > > > > > incremental
> > > > > > > >    query to find out the data, and then perform data
> > processing;
> > > > > > > >
> > > > > > > >
> > > > > > > > However, it is still very troublesome to split into two steps
> > for
> > > > > > > scenarios
> > > > > > > > that want to perform more timely and efficient data analysis
> on
> > > the
> > > > > > data
> > > > > > > > ingest pipeline. Therefore, I propose to merge the entire
> > process
> > > > > into
> > > > > > > one
> > > > > > > > step and provide a set of incremental(or saying Pipelined)
> > > > processing
> > > > > > API
> > > > > > > > based on this:
> > > > > > > >
> > > > > > > > Write the data to a hudi table, after obtaining the data
> > through
> > > > > > > > JavaRDD<WriteStatus>, directly apply the user-defined
> > > function(UDF)
> > > > > to
> > > > > > > > process the data. The processing behavior can be described
> via
> > > > these
> > > > > > two
> > > > > > > > steps:
> > > > > > > >
> > > > > > > >
> > > > > > > >    1.
> > > > > > > >
> > > > > > > >    Conventional conversion such as Map/Filter/Reduce;
> > > > > > > >    2.
> > > > > > > >
> > > > > > > >    Aggregation calculation based on fixed time window;
> > > > > > > >
> > > > > > > >
> > > > > > > > And these calculation functions should be engine independent.
> > > > > > Therefore,
> > > > > > > I
> > > > > > > > plan to introduce some new APIs that allow users to directly
> > > define
> > > > > > > > incremental processing capabilities after each writing
> > operation.
> > > > > > > >
> > > > > > > > The preliminary idea is that we can introduce a tool class,
> for
> > > > > > example,
> > > > > > > > named: IncrementalProcessingBuilder or PipelineBuilder, which
> > can
> > > > be
> > > > > > used
> > > > > > > > like this:
> > > > > > > >
> > > > > > > > IncrementalProcessingBuilder builder = new
> > > > > > > IncrementalProcessingBuilder();
> > > > > > > >
> > > > > > > > builder.source() //soure table
> > > > > > > >
> > > > > > > > .transform()
> > > > > > > >
> > > > > > > > .sink()          //derived table
> > > > > > > >
> > > > > > > > .build();
> > > > > > > >
> > > > > > > >
> > > > IncrementalProcessingBuilder#mapAfterInsert(JavaRDD<HoodieRecord<T>>
> > > > > > > > records, HudiMapFunction mapFunction);
> > > > > > > >
> > > > > > > >
> > > > IncrementalProcessingBuilder#mapAfterUpsert(JavaRDD<HoodieRecord<T>>
> > > > > > > > records, HudiMapFunction mapFunction);
> > > > > > > >
> > > > > > > >
> > > > >
> > IncrementalProcessingBuilder#filterAfterInsert(JavaRDD<HoodieRecord<T>>
> > > > > > > > records, HudiFilterFunction mapFunction);
> > > > > > > >
> > > > > > > > //window function
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> IncrementalProcessingBuilder#aggregateAfterInsert(JavaRDD<HoodieRecord<T>>
> > > > > > > > records, HudiAggregateFunction aggFunction);
> > > > > > > >
> > > > > > > > It is suitable for scenarios where the commit interval
> (window)
> > > is
> > > > > > > moderate
> > > > > > > > and the delay of data ingestion is not very concerned.
> > > > > > > >
> > > > > > > >
> > > > > > > > What do you think? Looking forward to your thoughts and
> > opinions.
> > > > > > > >
> > > > > > > >
> > > > > > > > Best,
> > > > > > > >
> > > > > > > > Vino
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to