Thanks Danny. I think there may be a problem, as reading the code and having written a test to check it, it seems this is not happening.
The CDCExtractor itself is fed an instant range and a boolean (consumeChangesFromCompaction ) for whether to consume from compaction instants. When consumeChangesFromCompaction is True is reads from COMPACT *and *the other instant types like COMMIT etc ( https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234). So if it works as you say, then there would have to be some other filtering going on somewhere. I went to look for the code that would further filter down the instants in some way. In the Flink code, the IncrementalInputSplits class uses the CDCExtractor. The interesting thing in this class is that the inputSplits method ( https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L132) uses the IncrementalQueryAnalyzer which is configured to read only from the CDC changelog when the Flink option is enabled ( https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L144C10-L144C30). However, importantly, when cdc is enabled ( https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L315), it calls getCdcInputSplits(metaClient, instantRange) which does not use the configured IncrementalQueryAnalyzer to source the timeline. It just uses the HoodieTableMetaClient, so there is no additional timeline filtering, only what there is within HoodieCDCExtractor. I wrote a MOR test case in TestIncrementalSplits and it does indeed return inference based splits (LOG_FILE) even when told to only read from CDC change log. This looks like a bug to me. Thanks Jack Thanks Jack On Tue, Aug 20, 2024 at 6:29 AM Danny Chan <danny0...@apache.org> wrote: > yeah, you are right, for mor table, when the cdc log is enabled, which > are generated during compaction, there are two choices for the reader: > > 1. read the changes from the change log, which got a hight TTL delay > because these logs are only generated during compaction; > 2. or it can infers the changes by itself, which got much shorter TTL > delay but more resource cost on the reader side. > > 1 and 2 are mutual exclusion, you can only enable either one of them. > > The reason we add some tricky logic in HoodieCDCExtractor is that when > we choose 1, the timeline needs to be filtered to only include > compaction commits; > while when 2 is enabled, the timeline needs to be filtered to exlucde > compaction commits. > > Best, > Danny > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月19日周一 18:25写道: > > > > Hi all, > > > > I'm trying to understand the CDC read process that Flink uses in Hudi. > > According to the Flink option, READ_CDC_FROM_CHANGELOG ( > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L365 > ), > > when True, Flink should only read from CDC files, when false, it must > infer > > the deltas from the base and log files. But reading the code, the > > HoodieCDCExtractor creates file splits for both CDC files and for the > > inference cases ( > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234 > ) > > when READ_CDC_FROM_CHANGELOG =True. > > > > This is confusing as it seems you would do one or the other but not both. > > Why go through all the work of inferring deltas from base and log files > > when perhaps a couple of commits ahead there is a compact instant that > has > > it all precomputed? > > > > Thanks > > Jack >