yeah, the InstantRange would include exact the commits that need to be read.
Best, Danny Jack Vanlightly <jack.vanligh...@gmail.com> 于2024年9月5日周四 01:27写道: > > Ok, so the InstantRange would be an exact match one, with only the instants > of compactions? > > On Mon, Aug 26, 2024, 02:46 Danny Chan <danny0...@apache.org> wrote: > > > > So if it works as you say, then there would have to be some other > > filtering > > going on somewhere. > > > > It is set up as intention, because for mor tables, a commit that is > > not a compaction (insert overwrite etc.) should also be read. > > > > > it calls getCdcInputSplits(metaClient, instantRange) which does not use > > the > > configured IncrementalQueryAnalyzer > > > > It utilities the InstantRange which got the filtered instants by > > IncrementalQueryAnalyzer, maybe you can add a test case and fire a fix > > for it. > > > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月21日周三 16:32写道: > > > > > > Thanks Danny. I think there may be a problem, as reading the code and > > > having written a test to check it, it seems this is not happening. > > > > > > The CDCExtractor itself is fed an instant range and a boolean > > > (consumeChangesFromCompaction ) for whether to consume from compaction > > > instants. When consumeChangesFromCompaction is True is reads from > > COMPACT *and > > > *the other instant types like COMMIT etc ( > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234 > > ). > > > So if it works as you say, then there would have to be some other > > filtering > > > going on somewhere. > > > > > > I went to look for the code that would further filter down the instants > > in > > > some way. In the Flink code, the IncrementalInputSplits class uses the > > > CDCExtractor. The interesting thing in this class is that the inputSplits > > > method ( > > > > > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L132 > > ) > > > uses the IncrementalQueryAnalyzer which is configured to read only from > > the > > > CDC changelog when the Flink option is enabled ( > > > > > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L144C10-L144C30 > > ). > > > However, importantly, when cdc is enabled ( > > > > > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L315 > > ), > > > it calls getCdcInputSplits(metaClient, instantRange) which does not use > > the > > > configured IncrementalQueryAnalyzer to source the timeline. It just uses > > > the HoodieTableMetaClient, so there is no additional timeline filtering, > > > only what there is within HoodieCDCExtractor. > > > > > > I wrote a MOR test case in TestIncrementalSplits and it does indeed > > return > > > inference based splits (LOG_FILE) even when told to only read from CDC > > > change log. This looks like a bug to me. > > > > > > Thanks > > > Jack > > > > > > > > > Thanks > > > Jack > > > > > > On Tue, Aug 20, 2024 at 6:29 AM Danny Chan <danny0...@apache.org> wrote: > > > > > > > yeah, you are right, for mor table, when the cdc log is enabled, which > > > > are generated during compaction, there are two choices for the reader: > > > > > > > > 1. read the changes from the change log, which got a hight TTL delay > > > > because these logs are only generated during compaction; > > > > 2. or it can infers the changes by itself, which got much shorter TTL > > > > delay but more resource cost on the reader side. > > > > > > > > 1 and 2 are mutual exclusion, you can only enable either one of them. > > > > > > > > The reason we add some tricky logic in HoodieCDCExtractor is that when > > > > we choose 1, the timeline needs to be filtered to only include > > > > compaction commits; > > > > while when 2 is enabled, the timeline needs to be filtered to exlucde > > > > compaction commits. > > > > > > > > Best, > > > > Danny > > > > > > > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月19日周一 18:25写道: > > > > > > > > > > Hi all, > > > > > > > > > > I'm trying to understand the CDC read process that Flink uses in > > Hudi. > > > > > According to the Flink option, READ_CDC_FROM_CHANGELOG ( > > > > > > > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L365 > > > > ), > > > > > when True, Flink should only read from CDC files, when false, it must > > > > infer > > > > > the deltas from the base and log files. But reading the code, the > > > > > HoodieCDCExtractor creates file splits for both CDC files and for the > > > > > inference cases ( > > > > > > > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234 > > > > ) > > > > > when READ_CDC_FROM_CHANGELOG =True. > > > > > > > > > > This is confusing as it seems you would do one or the other but not > > both. > > > > > Why go through all the work of inferring deltas from base and log > > files > > > > > when perhaps a couple of commits ahead there is a compact instant > > that > > > > has > > > > > it all precomputed? > > > > > > > > > > Thanks > > > > > Jack > > > > > >