Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2024-01-09 Thread via GitHub


waitingF closed pull request #10266: [HUDI-6979][RFC-76] support event time 
based compaction strategy
URL: https://github.com/apache/hudi/pull/10266


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-20 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1432374162


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likel

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-19 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1431236118


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.

Review Comment:
   I see, will remove this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please lo

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-19 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1431212628


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likely

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-19 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1431047153


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likel

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-19 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1431045672


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.

Review Comment:
   > then we can schedule the compaction at timestamp t3 + late_inteval with 
eventTimeInstant t3.
   
   The additional wait for tim

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-18 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1430991273


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likely

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-18 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1430889062


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likel

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-18 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1430888168


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.

Review Comment:
   The compactor can ensure this if we if there is no clock skew, in our Hudi 
implementation, we will by default wait another 200ms 

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-18 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1430146341


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likely

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-18 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1430146024


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.

Review Comment:
   The threshold refers to the generated `eventTimeInstant`.
   For example, there may be cases that we make daily snapshots from hudi

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-17 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1429454087


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.
+ 
+To implement exact event time split, we should modify 
`HoodieMergedLogRecordScanner` adding new methods to get the remained log 
records, 
+likel

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-17 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1429452701


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.
+
+In .commit/.deltacommit metadata, we can extract the min/max event time for 
each base file or log file and take the min/max event time as data completion 
and freshness for that base file or log file. 
+This is the base for data completion and freshness of RO table.
+
+## Implementation
+
+With the ability of new file slice algorithm and compaction based on 
completion time, we can choose to
+
+### Schedule Compaction Plan
+The `EventTimeBasedCompactionStrategy` strategy is responsible to calculate 
the event time threshold for compaction plan.
+Implement a method to calculate the event time threshold:
+```java
+// return the threshold as instant time format
+public String calcEventTimeThreshold(String compactionInstant, 
HoodieWriteConfig config)
+```
+
+When generating compaction plan, we take the event time threshold as the next 
base instant time and save it to `CompactionOpertion::nextBaseInstantTime`.
+
+
+### Execute Compaction Plan
+Get `nextBaseInstantTime` from `CompactionOpertion`, write base file to the 
`nextBaseInstantTime`.
+
+Sample code in HoodieCompactor:
+```java
+String nextBaseInstantTime = compactionOpertion.getNextBaseInstantTime();
+result = executionHelper.writeFileAndGetWriteStats(compactionHandler, 
operation, nextBaseInstantTime, scanner, oldDataFileOpt);
+```
+
+Follow up:
+
+With the `EventTimeBasedCompactionStrategy` above, there can be data with 
event time after the threshold being compacted.
+For cases users want exact event time split (users don't want any data after 
threshold exist in RO table),
+we need to perform the log record filter in compaction execution based on the 
time threshold.

Review Comment:
   > users don't want any data after threshold exist in RO table
   
   What threshold are you refering to?



-- 
This is an automat

Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-17 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1429450376


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,143 @@
+
+# RFC-[76]: [support EventTimeBasedCompactionStrategy and metric for RO table]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @danny0405
+ - @voonhous
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+Generally the data to ingest arrives roughly in event time order. For cases 
some users read the hudi table,
+they may not concern with the immediate full data, but the data before a 
specified time T (eg. data before 0 clock).
+For such cases, user want all data before a specified time, user have to query 
the RT table to get all data with expected high query latency.
+
+Based on this, we want to implement the strategy based on the event time: 
`EventTimeBasedCompactionStrategy`. 
+
+
+## Background
+
+Currently, there is no compaction strategy based on event time, the only 
`DayBasedCompactionStrategy` need the table partitioned by day in specified 
format(/mm/dd), 
+which is not very general and has too large a time granularity. There is no 
data completion/freshness guarantee or metrics for RO table.
+
+Based on this, we plan to 
+1. launch a compaction strategy based on event time: 
`EventTimeBasedCompactionStrategy`.
+2. report RO table data completion and freshness.
+
+With the data completion and freshness of RO table, we can expand use-case of 
RO table.
+That is given event time threshold T, the log files before T can be compacted, 
then resulting RO table obtains all data before T, with low query latency.
+
+Currently, we introduced new file slicing algorithm based on barrier and 
generating compaction plan based on completion time 
[[HUDI-6495][RFC-66]](https://github.com/apache/hudi/pull/7907).
+
+With the barrier file slicing, file slices are split by the barriers extracted 
from every instant time of base files 
+or the first instant of log file if no base file exists.
+
+When generating compaction plan, the scheduler will filter all log files with 
completion time before the `compactionInstant`. 
+And the compaction operation will generate a new base file with 
`compactionInstant` as its instant time;
+
+So the file slices are split naturally by the `compactionInstant` for MOR 
cases. 
+
+So we can replace the `compactionInstant` with `eventTimeInstant` computed by 
`EventTimeBasedCompactionStrategy` when generating plan. 
+In this case, file slices are split by `eventTimeInstant` instead of 
`compactionInstant`. 
+This is the base for `EventTimeBasedCompactionStrategy`.

Review Comment:
   feasible, just to note that the generation of the `eventTImeInstant` must 
share the same lock with normal instant time generation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-12 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1423651556


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,238 @@
+
+# RFC-[74]: [support EventTimeBasedCompactionStrategy]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @
+ - @
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+But currently, there is no compaction strategy based on event time, so there 
is no data freshness guarantee for RO table.
+For cases, user want all data before a specified time, user have to query the 
RT table to get all data with expected high query latency.

Review Comment:
   sure, will do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-11 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1423554270


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,238 @@
+
+# RFC-[74]: [support EventTimeBasedCompactionStrategy]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @
+ - @
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+But currently, there is no compaction strategy based on event time, so there 
is no data freshness guarantee for RO table.
+For cases, user want all data before a specified time, user have to query the 
RT table to get all data with expected high query latency.

Review Comment:
   Can you modify the doc based on our new file slicing algorithm?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-11 Thread via GitHub


waitingF commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1423345721


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,238 @@
+
+# RFC-[74]: [support EventTimeBasedCompactionStrategy]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @
+ - @
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+But currently, there is no compaction strategy based on event time, so there 
is no data freshness guarantee for RO table.
+For cases, user want all data before a specified time, user have to query the 
RT table to get all data with expected high query latency.

Review Comment:
   > With our new file slicing under unbounded io compaction strategy, a 
compaction plan at t is designated as including all the log files complete 
before t, does that make sense to your use case? You can then query the ro 
table after the compaction completes, the ro table data freshness is at least 
up to t.
   
   I dont think so, as there will be file groups in pending compaction which 
will be skipped in scheduling compaction plan, in this case, it will break the 
rule that "the ro table data freshness is at least up to `t`", there may be 
history data in those file groups.
   We should ensure all log files before `t` being compacted, that means we 
should generate new plan if no file group in pending compaction/clustering, 
that is no pending compaction or clustering left. For this, we can introduce a 
new trigger.
   
   > The question is how to tell the reader the freshness of the ro table?
   
   Yeah, this is part of the rfc.  We can extract the freshness from log file 
during compacting



##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,238 @@
+
+# RFC-[74]: [support EventTimeBasedCompactionStrategy]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @
+ - @
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 

Review Comment:
   yeah, looks like so



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-10 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1422044423


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,238 @@
+
+# RFC-[74]: [support EventTimeBasedCompactionStrategy]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @
+ - @
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 
+compact log files into base file later. When querying the snapshot table (RT 
table) generated by MOR, 
+query side have to perform a compaction so that they can get all data, which 
is expected time-consuming causing query latency.
+At the time, hudi provide read-optimized table (RO table) for low query 
latency just like COW.
+
+But currently, there is no compaction strategy based on event time, so there 
is no data freshness guarantee for RO table.
+For cases, user want all data before a specified time, user have to query the 
RT table to get all data with expected high query latency.

Review Comment:
   With our new file slicing under unbounded io compaction strategy, a 
compaction plan at `t` is designated as including all the log files complete 
before `t`, does that make sense to your use case? You can then query the ro 
table after the compaction completes, the ro table data freshness is at least 
up to `t`.
   
   The question is how to tell the reader the freshness of the ro table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [HUDI-6979][RFC-76] support event time based compaction strategy [hudi]

2023-12-10 Thread via GitHub


danny0405 commented on code in PR #10266:
URL: https://github.com/apache/hudi/pull/10266#discussion_r1422041532


##
rfc/rfc-76/rfc-76.md:
##
@@ -0,0 +1,238 @@
+
+# RFC-[74]: [support EventTimeBasedCompactionStrategy]
+
+## Proposers
+
+- @waitingF
+
+## Approvers
+ - @
+ - @
+
+## Status
+
+JIRA: [HUDI-6979](https://issues.apache.org/jira/browse/HUDI-6979)
+
+> Please keep the status updated in `rfc/README.md`.
+
+## Abstract
+
+Currently, to gain low ingestion latency, we can adopt the MergeOnRead table, 
which support appending log files and 

Review Comment:
   It looks very relevent with our new file slicing algorithm: 
https://github.com/apache/hudi/pull/7907, in our new file slicing, the 
compaction plan only include log files that committed before the compaction 
instant (start) time: 
https://github.com/apache/hudi/pull/9776/files#diff-c556da806e520ab33f861e35e9f4b97cc35b2e504ea10e9b70ff93d2ffa59ff6R136



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org