yes. there is definitely intent to support stream-stream joins between Hudi
tables, in the future.
This can be used to implement really good watermarks (structured
streaming/flink/beam)

At least for me, this is why I created the project originally to begin with
:)
https://www.oreilly.com/content/ubers-case-for-incremental-processing-on-hadoop/

If we can keep the time interval (i.e the 1 min) configurable and also
encode it along with the histogram,
we can control the storage footprint better. May be also consider using
something like t-digest for histogram?



On Fri, Feb 5, 2021 at 2:37 AM Raymond Xu <[email protected]>
wrote:

> liujinhui and vinoth, Thank you for the input! Have created
> https://issues.apache.org/jira/browse/HUDI-1587
> Yes I think the min and max timing mitigates the late arrival issue, which
> also should not happen that frequently like for every other commit.
> I think the histogram is a cool idea and the storage may not be too bad; in
> a continuous processing mode, even the records in a commit span over 30
> min, it'll just be a map of 30 keys with int values for that commit
> metadata. Worth a shot if there is a clear requirement on stream-stream
> joins
>
> On Tue, Feb 2, 2021 at 10:16 PM Vinoth Chandar <[email protected]> wrote:
>
> > +1
> >
> > I was involved in a very similar design at my previous job. We could
> > actually track both min and max event times.
> > We used to call the min - latency  and max - freshness (i.e indicates
> that
> > some data for these later time intervals are flowing in).
> >
> > It does not solve the issue  liujinjui mentioned completely. but with
> these
> > two numbers, we can get a sense of what the arrival times
> > of records are. for e.g if there was a sudden inflow of older records,
> > while newer data is also coming in, only the latency would go down,
> > freshness will still be good.
> >
> > Throwing out another more generalized idea, that can actually help us
> > eventually with streaming ETL (design watermarks for e,g).
> > I wonder if we can actually store a histogram of number of record
> received
> > for each 1 min (configurable) interval in that commit.
> > This will give a very clear picture of data arrival patterns per se. and
> > can be used to implement stream-stream joins (incremental query)
> > Happy to also punt on this now, given it may need additional storage etc.
> > Jsut a thought.
> >
> > On Tue, Feb 2, 2021 at 9:43 PM 刘金辉 <[email protected]> wrote:
> >
> > > +1,&nbsp;It feels great, but in actual business scenarios, due to some
> > > data abnormalities, the event time will be inaccurate.&nbsp;
> > > This situation seems to affect the monitoring of this indicator?
> > >
> > >
> > > &nbsp;Best,
> > > &nbsp;liujinhui
> > >
> > >
> > >
> > >
> > > ------------------&nbsp;原始邮件&nbsp;------------------
> > > 发件人:
> > >                                                   "dev"
> > >                                                                 <
> > > [email protected]&gt;;
> > > 发送时间:&nbsp;2021年2月3日(星期三) 上午9:55
> > > 收件人:&nbsp;"dev"<[email protected]&gt;;
> > >
> > > 主题:&nbsp;[DISCUSS] Measure latency by storing event time in WriteStatus
> > >
> > >
> > >
> > > Hi all,
> > >
> > > It is a common requirement to measure data latency in Hudi tables.
> There
> > > isn't a metric reporting latency directly from HoodieMetrics. I'm
> > proposing
> > > to measure the latency for each commit by this formula
> > >
> > > latency = commitTime + commitDuration - earliest event time of the
> > incoming
> > > records
> > >
> > > There are 4 major parts to make this available (thanks to Vinoth's
> hints)
> > >
> > > - To store the earliest event time, we need to extract the event times
> > from
> > > Hoodie payloads. We can make it available in
> > > org.apache.hudi.common.model.DefaultHoodieRecordPayload#getMetadata()
> > >
> > > - then org.apache.hudi.client.WriteStatus#markSuccess() can perform the
> > > comparison and store the min value
> > > in org.apache.hudi.common.model.HoodieWriteStat
> > >
> > > - org.apache.hudi.common.model.HoodieCommitMetadata can then aggregate
> > all
> > > the min values and returns a global min of all the partitions.
> > >
> > > - lastly, in org.apache.hudi.metrics.HoodieMetrics#updateCommitMetrics
> we
> > > can compute the latency using the formula above
> > >
> > > I have a draft implementation shown in the diff
> > >
> https://github.com/apache/hudi/compare/master...xushiyan:measure-latency
> > >
> > > I think this metric will be commonly used so I made those changes on
> > > default classes like DefaultHoodieRecordPayload and HoodieWriteStat.
> Hope
> > > to get some early feedback on the implementation. Thank you.
> > >
> > > Best,
> > > Raymond
> >
>

Reply via email to