danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1429454087
########## rfc/rfc-76/rfc-76.md: ########## @@ -0,0 +1,143 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(yyyy/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java + // return the threshold as instant time format + public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java + String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); + result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likely naming `getRemainLogsIterator` and `getRemainRecords` corresponding to `HoodieMergedLogRecordScanner::iterator` and `HoodieMergedLogRecordScanner::getRecords`. +And we have to rewrite those remained log records back to new log file. + + +### Compute Data Freshness / Completion +Given the definition below: + +| Query based table | Data Completion Time | Data Freshness Time | +|---------------------------------|--------------------------------------------------------|--------------------------------| +| Snapshot table (RT table) | all data before the time been ingested into hudi | the latest data been ingested | +| Read Optimized table (RO table) | all data before the time been compacted into base file | the latest data been compacted | + +For RT table, the data completion and freshness can be simply extracted from `HoodieWriteStat::minEventTime` and `HoodieWriteStat::maxEventTime`. Now they are used to report metrics `commitLatencyInMs` and `commitFreshnessInMs`. + +For RO table, the data freshness time is the maxEventTime of last compaction, can be extracted from last commit metadata simply. +But for the data completion time, we can not simply take the `minEventTime` of last compaction since it may be data after `minEventTime` uncompacted. So it depends on the earliest event time of all un-compacted log files. Review Comment: > un-compacted log files. The compaction is not visible if it is not complete, so how the un-compacted log files play a role here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org