> So if it works as you say, then there would have to be some other filtering
going on somewhere.

It is set up as intention, because for mor tables, a commit that is
not a compaction (insert overwrite etc.) should also be read.

> it calls getCdcInputSplits(metaClient, instantRange) which does not use the
configured IncrementalQueryAnalyzer

It utilities the InstantRange which got the filtered instants by
IncrementalQueryAnalyzer, maybe you can add a test case and fire a fix
for it.

Jack Vanlightly <vanligh...@apache.org> 于2024年8月21日周三 16:32写道:
>
> Thanks Danny. I think there may be a problem, as reading the code and
> having written a test to check it, it seems this is not happening.
>
> The CDCExtractor itself is fed an instant range and a boolean
> (consumeChangesFromCompaction ) for whether to consume from compaction
> instants. When consumeChangesFromCompaction is True is reads from COMPACT *and
> *the other instant types like COMMIT etc (
> https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234).
> So if it works as you say, then there would have to be some other filtering
> going on somewhere.
>
> I went to look for the code that would further filter down the instants in
> some way. In the Flink code, the IncrementalInputSplits class uses the
> CDCExtractor. The interesting thing in this class is that the inputSplits
> method (
> https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L132)
> uses the IncrementalQueryAnalyzer which is configured to read only from the
> CDC changelog when the Flink option is enabled (
> https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L144C10-L144C30).
> However, importantly, when cdc is enabled (
> https://github.com/apache/hudi/blob/4b5c22339c7e412828b61469d8a037a6d73d2aa2/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java#L315),
> it calls getCdcInputSplits(metaClient, instantRange) which does not use the
> configured IncrementalQueryAnalyzer to source the timeline. It just uses
> the HoodieTableMetaClient, so there is no additional timeline filtering,
> only what there is within HoodieCDCExtractor.
>
> I wrote a MOR test case in TestIncrementalSplits and it does indeed return
> inference based splits (LOG_FILE) even when told to only read from CDC
> change log. This looks like a bug to me.
>
> Thanks
> Jack
>
>
> Thanks
> Jack
>
> On Tue, Aug 20, 2024 at 6:29 AM Danny Chan <danny0...@apache.org> wrote:
>
> > yeah, you are right, for mor table, when the cdc log is enabled, which
> > are generated during compaction, there are two choices for the reader:
> >
> > 1. read the changes from the change log, which got a hight TTL delay
> > because these logs are only generated during compaction;
> > 2. or it can infers the changes by itself, which got much shorter TTL
> > delay but more resource cost on the reader side.
> >
> > 1 and 2 are mutual exclusion, you can only enable either one of them.
> >
> > The reason we add some tricky logic in HoodieCDCExtractor is that when
> > we choose 1, the timeline needs to be filtered to only include
> > compaction commits;
> > while when 2 is enabled, the timeline needs to be filtered to exlucde
> > compaction commits.
> >
> > Best,
> > Danny
> >
> > Jack Vanlightly <vanligh...@apache.org> 于2024年8月19日周一 18:25写道:
> > >
> > > Hi all,
> > >
> > > I'm trying to understand the CDC read process that Flink uses in Hudi.
> > > According to the Flink option, READ_CDC_FROM_CHANGELOG (
> > >
> > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java#L365
> > ),
> > > when True, Flink should only read from CDC files, when false, it must
> > infer
> > > the deltas from the base and log files. But reading the code, the
> > > HoodieCDCExtractor creates file splits for both CDC files and for the
> > > inference cases (
> > >
> > https://github.com/apache/hudi/blob/db5c2d97dc94122ebd63e6200858eabc4b119178/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java#L234
> > )
> > > when READ_CDC_FROM_CHANGELOG =True.
> > >
> > > This is confusing as it seems you would do one or the other but not both.
> > > Why go through all the work of inferring deltas from base and log files
> > > when perhaps a couple of commits ahead there is a compact instant that
> > has
> > > it all precomputed?
> > >
> > > Thanks
> > > Jack
> >

Reply via email to