yes. there is definitely intent to support stream-stream joins between Hudi tables, in the future. This can be used to implement really good watermarks (structured streaming/flink/beam)
At least for me, this is why I created the project originally to begin with :) https://www.oreilly.com/content/ubers-case-for-incremental-processing-on-hadoop/ If we can keep the time interval (i.e the 1 min) configurable and also encode it along with the histogram, we can control the storage footprint better. May be also consider using something like t-digest for histogram? On Fri, Feb 5, 2021 at 2:37 AM Raymond Xu <[email protected]> wrote: > liujinhui and vinoth, Thank you for the input! Have created > https://issues.apache.org/jira/browse/HUDI-1587 > Yes I think the min and max timing mitigates the late arrival issue, which > also should not happen that frequently like for every other commit. > I think the histogram is a cool idea and the storage may not be too bad; in > a continuous processing mode, even the records in a commit span over 30 > min, it'll just be a map of 30 keys with int values for that commit > metadata. Worth a shot if there is a clear requirement on stream-stream > joins > > On Tue, Feb 2, 2021 at 10:16 PM Vinoth Chandar <[email protected]> wrote: > > > +1 > > > > I was involved in a very similar design at my previous job. We could > > actually track both min and max event times. > > We used to call the min - latency and max - freshness (i.e indicates > that > > some data for these later time intervals are flowing in). > > > > It does not solve the issue liujinjui mentioned completely. but with > these > > two numbers, we can get a sense of what the arrival times > > of records are. for e.g if there was a sudden inflow of older records, > > while newer data is also coming in, only the latency would go down, > > freshness will still be good. > > > > Throwing out another more generalized idea, that can actually help us > > eventually with streaming ETL (design watermarks for e,g). > > I wonder if we can actually store a histogram of number of record > received > > for each 1 min (configurable) interval in that commit. > > This will give a very clear picture of data arrival patterns per se. and > > can be used to implement stream-stream joins (incremental query) > > Happy to also punt on this now, given it may need additional storage etc. > > Jsut a thought. > > > > On Tue, Feb 2, 2021 at 9:43 PM 刘金辉 <[email protected]> wrote: > > > > > +1, It feels great, but in actual business scenarios, due to some > > > data abnormalities, the event time will be inaccurate. > > > This situation seems to affect the monitoring of this indicator? > > > > > > > > > Best, > > > liujinhui > > > > > > > > > > > > > > > ------------------ 原始邮件 ------------------ > > > 发件人: > > > "dev" > > > < > > > [email protected]>; > > > 发送时间: 2021年2月3日(星期三) 上午9:55 > > > 收件人: "dev"<[email protected]>; > > > > > > 主题: [DISCUSS] Measure latency by storing event time in WriteStatus > > > > > > > > > > > > Hi all, > > > > > > It is a common requirement to measure data latency in Hudi tables. > There > > > isn't a metric reporting latency directly from HoodieMetrics. I'm > > proposing > > > to measure the latency for each commit by this formula > > > > > > latency = commitTime + commitDuration - earliest event time of the > > incoming > > > records > > > > > > There are 4 major parts to make this available (thanks to Vinoth's > hints) > > > > > > - To store the earliest event time, we need to extract the event times > > from > > > Hoodie payloads. We can make it available in > > > org.apache.hudi.common.model.DefaultHoodieRecordPayload#getMetadata() > > > > > > - then org.apache.hudi.client.WriteStatus#markSuccess() can perform the > > > comparison and store the min value > > > in org.apache.hudi.common.model.HoodieWriteStat > > > > > > - org.apache.hudi.common.model.HoodieCommitMetadata can then aggregate > > all > > > the min values and returns a global min of all the partitions. > > > > > > - lastly, in org.apache.hudi.metrics.HoodieMetrics#updateCommitMetrics > we > > > can compute the latency using the formula above > > > > > > I have a draft implementation shown in the diff > > > > https://github.com/apache/hudi/compare/master...xushiyan:measure-latency > > > > > > I think this metric will be commonly used so I made those changes on > > > default classes like DefaultHoodieRecordPayload and HoodieWriteStat. > Hope > > > to get some early feedback on the implementation. Thank you. > > > > > > Best, > > > Raymond > > >
