danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1429454087


##########
rfc/rfc-76/rfc-76.md:
##########
@@ -0,0 +1,143 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(yyyy/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+    // return the threshold as instant time format
+    public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+    String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+    result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likely naming `getRemainLogsIterator` and `getRemainRecords` corresponding to 
`HoodieMergedLogRecordScanner::iterator` and 
`HoodieMergedLogRecordScanner::getRecords`.
+And we have to rewrite those remained log records back to new log file.
+
+
+### Compute Data Freshness / Completion
+Given the definition below:
+
+| Query based table               | Data Completion Time                       
            | Data Freshness Time            |
+|---------------------------------|--------------------------------------------------------|--------------------------------|
+| Snapshot table (RT table)       | all data before the time been ingested 
into hudi       | the latest data been ingested  |
+| Read Optimized table (RO table) | all data before the time been compacted 
into base file | the latest data been compacted |
+
+For RT table, the data completion and freshness can be simply extracted from 
`HoodieWriteStat::minEventTime` and `HoodieWriteStat::maxEventTime`. Now they 
are used to report metrics `commitLatencyInMs` and `commitFreshnessInMs`.
+
+For RO table, the data freshness time is the maxEventTime of last compaction, 
can be extracted from last commit metadata simply.
+But for the data completion time, we can not simply take the `minEventTime` of 
last compaction since it may be data after `minEventTime` uncompacted. So it 
depends on the earliest event time of all un-compacted log files. 

Review Comment:
   > un-compacted log files. 
   
   The compaction is not visible if it is not complete, so how the un-compacted 
log files play a role here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to