> So if it works as you say, then there would have to be some other filtering going on somewhere.
It is set up as intention, because for mor tables, a commit that is not a compaction (insert overwrite etc.) should also be read. > it calls getCdcInputSplits(metaClient, instantRange) which does not use the configured IncrementalQueryAnalyzer It utilities the InstantRange which got the filtered instants by IncrementalQueryAnalyzer, maybe you can add a test case and fire a fix for it. Jack Vanlightly <vanligh...@apache.org> 于2024年8月21日周三 16:32写道: > > Thanks Danny. I think there may be a problem, as reading the code and > having written a test to check it, it seems this is not happening. > > The CDCExtractor itself is fed an instant range and a boolean > (consumeChangesFromCompaction ) for whether to consume from compaction > instants. When consumeChangesFromCompaction is True is reads from COMPACT *and > *the other instant types like COMMIT etc ( > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234). > So if it works as you say, then there would have to be some other filtering > going on somewhere. > > I went to look for the code that would further filter down the instants in > some way. In the Flink code, the IncrementalInputSplits class uses the > CDCExtractor. The interesting thing in this class is that the inputSplits > method ( > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L132) > uses the IncrementalQueryAnalyzer which is configured to read only from the > CDC changelog when the Flink option is enabled ( > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L144C10-L144C30). > However, importantly, when cdc is enabled ( > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L315), > it calls getCdcInputSplits(metaClient, instantRange) which does not use the > configured IncrementalQueryAnalyzer to source the timeline. It just uses > the HoodieTableMetaClient, so there is no additional timeline filtering, > only what there is within HoodieCDCExtractor. > > I wrote a MOR test case in TestIncrementalSplits and it does indeed return > inference based splits (LOG_FILE) even when told to only read from CDC > change log. This looks like a bug to me. > > Thanks > Jack > > > Thanks > Jack > > On Tue, Aug 20, 2024 at 6:29 AM Danny Chan <danny0...@apache.org> wrote: > > > yeah, you are right, for mor table, when the cdc log is enabled, which > > are generated during compaction, there are two choices for the reader: > > > > 1. read the changes from the change log, which got a hight TTL delay > > because these logs are only generated during compaction; > > 2. or it can infers the changes by itself, which got much shorter TTL > > delay but more resource cost on the reader side. > > > > 1 and 2 are mutual exclusion, you can only enable either one of them. > > > > The reason we add some tricky logic in HoodieCDCExtractor is that when > > we choose 1, the timeline needs to be filtered to only include > > compaction commits; > > while when 2 is enabled, the timeline needs to be filtered to exlucde > > compaction commits. > > > > Best, > > Danny > > > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月19日周一 18:25写道: > > > > > > Hi all, > > > > > > I'm trying to understand the CDC read process that Flink uses in Hudi. > > > According to the Flink option, READ_CDC_FROM_CHANGELOG ( > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L365 > > ), > > > when True, Flink should only read from CDC files, when false, it must > > infer > > > the deltas from the base and log files. But reading the code, the > > > HoodieCDCExtractor creates file splits for both CDC files and for the > > > inference cases ( > > > > > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234 > > ) > > > when READ_CDC_FROM_CHANGELOG =True. > > > > > > This is confusing as it seems you would do one or the other but not both. > > > Why go through all the work of inferring deltas from base and log files > > > when perhaps a couple of commits ahead there is a compact instant that > > has > > > it all precomputed? > > > > > > Thanks > > > Jack > >