[
https://issues.apache.org/jira/browse/HUDI-8704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Y Ethan Guo updated HUDI-8704:
------------------------------
Parent Issue: HUDI-8709 (was: HUDI-8708)
> Instant time to completion time migration for Flink streaming reader
> --------------------------------------------------------------------
>
> Key: HUDI-8704
> URL: https://issues.apache.org/jira/browse/HUDI-8704
> Project: Apache Hudi
> Issue Type: Sub-task
> Components: flink-sql
> Reporter: Danny Chen
> Assignee: voon
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.0.2
>
> Original Estimate: 12h
> Remaining Estimate: 12h
>
> Since 1.0 release, Hudi introduces a new component named:
> {code:java}
> IncrementalQueryAnalyzer {code}
> this is used from completion time based incremental queries, user can
> specifiy the start/end completion time for the commits to subscribe for a
> incremental run.
> In IncrementalInputSplits#inputSplits, we use this new component to figure
> out the commits and filters.
> In this ticket, we want to keep compatibility for legacy workloads, the
> legacy workload persisted the latest consumed instant time(start time) in
> Flink state-backend in operator
> StreamReadMonitoringFunction, and pass it around to the
> IncrementalInputSplits#inputSplits for current run. We need to translate this
> instant time into a completion time to avoid consuming duplicates.
> In 0.x StreamReadMonitoringFunction, Hudi stores one timestamp string:
> issuedInstant, which is the max consumed instant time.
> In 1.x StreamReadMonitoringFunction, Hudi stores two timestamp string:
> issuedInstant and issuedOffset, the issuedInstant is the max instant time
> been consumed, the issuedOffset is the max completion time.
> So here is the migration logic:
> in StreamReadMonitoringFunction, if we detect that the issuedOffset in the
> state-backend is null, we know this is from the legacy table, and we
> translate it into the completion time of the instant(by quering the active
> timeline), then pass it around to the IncrementalInputSplits#inputSplits.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)