yeah, the InstantRange would include exact the commits that need to be read.

Best,
Danny

Jack Vanlightly <jack.vanligh...@gmail.com> 于2024年9月5日周四 01:27写道:
>
> Ok, so the InstantRange would be an exact match one, with only the instants
> of compactions?
>
> On Mon, Aug 26, 2024, 02:46 Danny Chan <danny0...@apache.org> wrote:
>
> > > So if it works as you say, then there would have to be some other
> > filtering
> > going on somewhere.
> >
> > It is set up as intention, because for mor tables, a commit that is
> > not a compaction (insert overwrite etc.) should also be read.
> >
> > > it calls getCdcInputSplits(metaClient, instantRange) which does not use
> > the
> > configured IncrementalQueryAnalyzer
> >
> > It utilities the InstantRange which got the filtered instants by
> > IncrementalQueryAnalyzer, maybe you can add a test case and fire a fix
> > for it.
> >
> > Jack Vanlightly <vanligh...@apache.org> 于2024年8月21日周三 16:32写道:
> > >
> > > Thanks Danny. I think there may be a problem, as reading the code and
> > > having written a test to check it, it seems this is not happening.
> > >
> > > The CDCExtractor itself is fed an instant range and a boolean
> > > (consumeChangesFromCompaction ) for whether to consume from compaction
> > > instants. When consumeChangesFromCompaction is True is reads from
> > COMPACT *and
> > > *the other instant types like COMMIT etc (
> > >
> > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234
> > ).
> > > So if it works as you say, then there would have to be some other
> > filtering
> > > going on somewhere.
> > >
> > > I went to look for the code that would further filter down the instants
> > in
> > > some way. In the Flink code, the IncrementalInputSplits class uses the
> > > CDCExtractor. The interesting thing in this class is that the inputSplits
> > > method (
> > >
> > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L132
> > )
> > > uses the IncrementalQueryAnalyzer which is configured to read only from
> > the
> > > CDC changelog when the Flink option is enabled (
> > >
> > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L144C10-L144C30
> > ).
> > > However, importantly, when cdc is enabled (
> > >
> > https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L315
> > ),
> > > it calls getCdcInputSplits(metaClient, instantRange) which does not use
> > the
> > > configured IncrementalQueryAnalyzer to source the timeline. It just uses
> > > the HoodieTableMetaClient, so there is no additional timeline filtering,
> > > only what there is within HoodieCDCExtractor.
> > >
> > > I wrote a MOR test case in TestIncrementalSplits and it does indeed
> > return
> > > inference based splits (LOG_FILE) even when told to only read from CDC
> > > change log. This looks like a bug to me.
> > >
> > > Thanks
> > > Jack
> > >
> > >
> > > Thanks
> > > Jack
> > >
> > > On Tue, Aug 20, 2024 at 6:29 AM Danny Chan <danny0...@apache.org> wrote:
> > >
> > > > yeah, you are right, for mor table, when the cdc log is enabled, which
> > > > are generated during compaction, there are two choices for the reader:
> > > >
> > > > 1. read the changes from the change log, which got a hight TTL delay
> > > > because these logs are only generated during compaction;
> > > > 2. or it can infers the changes by itself, which got much shorter TTL
> > > > delay but more resource cost on the reader side.
> > > >
> > > > 1 and 2 are mutual exclusion, you can only enable either one of them.
> > > >
> > > > The reason we add some tricky logic in HoodieCDCExtractor is that when
> > > > we choose 1, the timeline needs to be filtered to only include
> > > > compaction commits;
> > > > while when 2 is enabled, the timeline needs to be filtered to exlucde
> > > > compaction commits.
> > > >
> > > > Best,
> > > > Danny
> > > >
> > > > Jack Vanlightly <vanligh...@apache.org> 于2024年8月19日周一 18:25写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > I'm trying to understand the CDC read process that Flink uses in
> > Hudi.
> > > > > According to the Flink option, READ_CDC_FROM_CHANGELOG (
> > > > >
> > > >
> > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L365
> > > > ),
> > > > > when True, Flink should only read from CDC files, when false, it must
> > > > infer
> > > > > the deltas from the base and log files. But reading the code, the
> > > > > HoodieCDCExtractor creates file splits for both CDC files and for the
> > > > > inference cases (
> > > > >
> > > >
> > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234
> > > > )
> > > > > when READ_CDC_FROM_CHANGELOG =True.
> > > > >
> > > > > This is confusing as it seems you would do one or the other but not
> > both.
> > > > > Why go through all the work of inferring deltas from base and log
> > files
> > > > > when perhaps a couple of commits ahead there is a compact instant
> > that
> > > > has
> > > > > it all precomputed?
> > > > >
> > > > > Thanks
> > > > > Jack
> > > >
> >

Reply via email to