Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF closed pull request #10266: [HUDI-6979][RFC-76] support event time based compaction strategy URL: https://github.com/apache/hudi/pull/10266 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1432374162 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likel
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1431236118 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. Review Comment: I see, will remove this part. -- This is an automated message from the Apache Git Service. To respond to the message, please lo
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1431212628 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likely
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1431047153 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likel
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1431045672 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. Review Comment: > then we can schedule the compaction at timestamp t3 + late_inteval with eventTimeInstant t3. The additional wait for tim
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1430991273 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likely
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1430889062 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likel
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1430888168 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. Review Comment: The compactor can ensure this if we if there is no clock skew, in our Hudi implementation, we will by default wait another 200ms
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1430146341 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likely
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1430146024 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. Review Comment: The threshold refers to the generated `eventTimeInstant`. For example, there may be cases that we make daily snapshots from hudi
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1429454087 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. + +To implement exact event time split, we should modify `HoodieMergedLogRecordScanner` adding new methods to get the remained log records, +likel
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1429452701 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. + +In .commit/.deltacommit metadata, we can extract the min/max event time for each base file or log file and take the min/max event time as data completion and freshness for that base file or log file. +This is the base for data completion and freshness of RO table. + +## Implementation + +With the ability of new file slice algorithm and compaction based on completion time, we can choose to + +### Schedule Compaction Plan +The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate the event time threshold for compaction plan. +Implement a method to calculate the event time threshold: +```java +// return the threshold as instant time format +public String calcEventTimeThreshold(String compactionInstant, HoodieWriteConfig config) +``` + +When generating compaction plan, we take the event time threshold as the next base instant time and save it to `CompactionOpertion::nextBaseInstantTime`. + + +### Execute Compaction Plan +Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the `nextBaseInstantTime`. + +Sample code in HoodieCompactor: +```java +String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime(); +result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, nextBaseInstantTime, scanner, oldDataFileOpt); +``` + +Follow up: + +With the `EventTimeBasedCompactionStrategy` above, there can be data with event time after the threshold being compacted. +For cases users want exact event time split (users don't want any data after threshold exist in RO table), +we need to perform the log record filter in compaction execution based on the time threshold. Review Comment: > users don't want any data after threshold exist in RO table What threshold are you refering to? -- This is an automat
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1429450376 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,143 @@ + +# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table] + +## Proposers + +- @waitingF + +## Approvers + - @danny0405 + - @voonhous + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +Generally the data to ingest arrives roughly in event time order. For cases some users read the hudi table, +they may not concern with the immediate full data, but the data before a specified time T (eg. data before 0 clock). +For such cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. + +Based on this, we want to implement the strategy based on the event time: `EventTimeBasedCompactionStrategy`. + + +## Background + +Currently, there is no compaction strategy based on event time, the only `DayBasedCompactionStrategy` need the table partitioned by day in specified format(/mm/dd), +which is not very general and has too large a time granularity. There is no data completion/freshness guarantee or metrics for RO table. + +Based on this, we plan to +1. launch a compaction strategy based on event time: `EventTimeBasedCompactionStrategy`. +2. report RO table data completion and freshness. + +With the data completion and freshness of RO table, we can expand use-case of RO table. +That is given event time threshold T, the log files before T can be compacted, then resulting RO table obtains all data before T, with low query latency. + +Currently, we introduced new file slicing algorithm based on barrier and generating compaction plan based on completion time [[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907). + +With the barrier file slicing, file slices are split by the barriers extracted from every instant time of base files +or the first instant of log file if no base file exists. + +When generating compaction plan, the scheduler will filter all log files with completion time before the `compactionInstant`. +And the compaction operation will generate a new base file with `compactionInstant` as its instant time; + +So the file slices are split naturally by the `compactionInstant` for MOR cases. + +So we can replace the `compactionInstant` with `eventTimeInstant` computed by `EventTimeBasedCompactionStrategy` when generating plan. +In this case, file slices are split by `eventTimeInstant` instead of `compactionInstant`. +This is the base for `EventTimeBasedCompactionStrategy`. Review Comment: feasible, just to note that the generation of the `eventTImeInstant` must share the same lock with normal instant time generation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1423651556 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,238 @@ + +# RFC-[74]: [support EventTimeBasedCompactionStrategy] + +## Proposers + +- @waitingF + +## Approvers + - @ + - @ + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +But currently, there is no compaction strategy based on event time, so there is no data freshness guarantee for RO table. +For cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. Review Comment: sure, will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1423554270 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,238 @@ + +# RFC-[74]: [support EventTimeBasedCompactionStrategy] + +## Proposers + +- @waitingF + +## Approvers + - @ + - @ + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +But currently, there is no compaction strategy based on event time, so there is no data freshness guarantee for RO table. +For cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. Review Comment: Can you modify the doc based on our new file slicing algorithm? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
waitingF commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1423345721 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,238 @@ + +# RFC-[74]: [support EventTimeBasedCompactionStrategy] + +## Proposers + +- @waitingF + +## Approvers + - @ + - @ + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +But currently, there is no compaction strategy based on event time, so there is no data freshness guarantee for RO table. +For cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. Review Comment: > With our new file slicing under unbounded io compaction strategy, a compaction plan at t is designated as including all the log files complete before t, does that make sense to your use case? You can then query the ro table after the compaction completes, the ro table data freshness is at least up to t. I dont think so, as there will be file groups in pending compaction which will be skipped in scheduling compaction plan, in this case, it will break the rule that "the ro table data freshness is at least up to `t`", there may be history data in those file groups. We should ensure all log files before `t` being compacted, that means we should generate new plan if no file group in pending compaction/clustering, that is no pending compaction or clustering left. For this, we can introduce a new trigger. > The question is how to tell the reader the freshness of the ro table? Yeah, this is part of the rfc. We can extract the freshness from log file during compacting ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,238 @@ + +# RFC-[74]: [support EventTimeBasedCompactionStrategy] + +## Proposers + +- @waitingF + +## Approvers + - @ + - @ + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and Review Comment: yeah, looks like so -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1422044423 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,238 @@ + +# RFC-[74]: [support EventTimeBasedCompactionStrategy] + +## Proposers + +- @waitingF + +## Approvers + - @ + - @ + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and +compact log files into base file later. When querying the snapshot table (RT table) generated by MOR, +query side have to perform a compaction so that they can get all data, which is expected time-consuming causing query latency. +At the time, hudi provide read-optimized table (RO table) for low query latency just like COW. + +But currently, there is no compaction strategy based on event time, so there is no data freshness guarantee for RO table. +For cases, user want all data before a specified time, user have to query the RT table to get all data with expected high query latency. Review Comment: With our new file slicing under unbounded io compaction strategy, a compaction plan at `t` is designated as including all the log files complete before `t`, does that make sense to your use case? You can then query the ro table after the compaction completes, the ro table data freshness is at least up to `t`. The question is how to tell the reader the freshness of the ro table? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]
danny0405 commented on code in PR #10266: URL: https://github.com/apache/hudi/pull/10266#discussion_r1422041532 ## rfc/rfc-76/rfc-76.md: ## @@ -0,0 +1,238 @@ + +# RFC-[74]: [support EventTimeBasedCompactionStrategy] + +## Proposers + +- @waitingF + +## Approvers + - @ + - @ + +## Status + +JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979) + +> Please keep the status updated in `rfc/README.md`. + +## Abstract + +Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, which support appending log files and Review Comment: It looks very relevent with our new file slicing algorithm: https://github.com/apache/hudi/pull/7907, in our new file slicing, the compaction plan only include log files that committed before the compaction instant (start) time: https://github.com/apache/hudi/pull/9776/files#diff-c556da806e520ab33f861e35e9f4b97cc35b2e504ea10e9b70ff93d2ffa59ff6R136 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org