Does this also affect Flink 1.14.0? If yes, do we want to fix this issue
for the upcoming release? If yes, then please make this issue a blocker or
at least critical.

Cheers,
Till

On Mon, Aug 23, 2021 at 8:39 AM Ingo Bürk <i...@ververica.com> wrote:

> Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23911
>
>
> Best
> Ingo
>
> On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <twal...@apache.org> wrote:
>
> > Hi everyone,
> >
> > this sounds definitely like a bug to me. Computing metadata might be
> > very expensive and a connector might expose a long list of metadata
> > keys. It was therefore intended to project the metadata if possible. I'm
> > pretty sure that this worked before (at least when implementing
> > SupportsProjectionPushDown). Maybe a bug was introduced when adding the
> > Spec support.
> >
> > Regards,
> > Timo
> >
> >
> > On 23.08.21 08:24, Ingo Bürk wrote:
> > > Hi Jingsong,
> > >
> > > thanks for your answer. Even if the source implements
> > > SupportsProjectionPushDown, #applyProjections will never be called with
> > > projections for metadata columns. For example, I have the following
> test:
> > >
> > > @Test
> > > def test(): Unit = {
> > >    val tableId = TestValuesTableFactory.registerData(Seq())
> > >
> > >    tEnv.createTemporaryTable("T",
> TableDescriptor.forConnector("values")
> > >      .schema(Schema.newBuilder()
> > >        .column("f0", DataTypes.INT())
> > >        .columnByMetadata("m1", DataTypes.STRING())
> > >        .columnByMetadata("m2", DataTypes.STRING())
> > >        .build())
> > >      .option("data-id", tableId)
> > >      .option("bounded", "true")
> > >      .option("readable-metadata", "m1:STRING,m2:STRING")
> > >      .build())
> > >
> > >    tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
> > > }
> > >
> > > Regardless of whether I select only f0 or f0 + m1,
> #applyReadableMetadata
> > > is always called with m1 + m2, and #applyProjections only ever sees f0.
> > So
> > > as far as I can tell, the source has no way of knowing which metadata
> > > columns are actually needed (under the projection), it always has to
> > > produce metadata for all metadata columns declared in the table's
> schema.
> > >
> > > In PushProjectIntoTableSourceScanRule I also haven't yet found anything
> > > that would suggest that metadata are first projected and only then
> pushed
> > > to the source. I think the correct behavior should be to call
> > > #applyReadableMetadata only after they have been considered in the
> > > projection.
> > >
> > >
> > > Best
> > > Ingo
> > >
> > >
> > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> > >
> > >> Hi,
> > >>
> > >> I remember the projection only works with SupportsProjectionPushDown.
> > >>
> > >> You can take a look at
> > >> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
> > >>
> > >> Will applyReadableMetadata again in the
> > PushProjectIntoTableSourceScanRule.
> > >>
> > >> But there may be bug in
> > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
> > >>
> > >> if (!usedMetadataNames.isEmpty()) {
> > >>      sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
> > >> newProducedType));
> > >> }
> > >>
> > >> If there is no meta column left, we should apply again, We should tell
> > >> the source that there is no meta column left after projection.
> > >>
> > >> Best,
> > >> Jingsong
> > >>
> > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com> wrote:
> > >>>
> > >>> Hi everyone,
> > >>>
> > >>> according to the SupportsReadableMetadata interface, the planner is
> > >>> supposed to project required metadata columns prior to applying them:
> > >>>
> > >>>> The planner will select required metadata columns (i.e. perform
> > >>> projection push down) and will call applyReadableMetadata(List,
> > DataType)
> > >>> with a list of metadata keys.
> > >>>
> > >>> However, from my experiments it seems that this is not true:
> regardless
> > >> of
> > >>> what columns I select from a table, #applyReadableMetadata always
> seems
> > >> to
> > >>> be called with all metadata declared in the schema of the table.
> > Metadata
> > >>> columns are also excluded from
> > >> SupportsProjectionPushDown#applyProjection,
> > >>> so the source cannot perform the projection either.
> > >>>
> > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this not
> > >>> working as intended?
> > >>>
> > >>>
> > >>> Best
> > >>> Ingo
> > >>
> > >>
> > >>
> > >> --
> > >> Best, Jingsong Lee
> > >>
> > >
> >
> >
>

Reply via email to