Does this also affect Flink 1.14.0? If yes, do we want to fix this issue for the upcoming release? If yes, then please make this issue a blocker or at least critical.
Cheers, Till On Mon, Aug 23, 2021 at 8:39 AM Ingo Bürk <i...@ververica.com> wrote: > Thanks Timo for the confirmation. I've also raised FLINK-23911[1] for this. > > [1] https://issues.apache.org/jira/browse/FLINK-23911 > > > Best > Ingo > > On Mon, Aug 23, 2021 at 8:34 AM Timo Walther <twal...@apache.org> wrote: > > > Hi everyone, > > > > this sounds definitely like a bug to me. Computing metadata might be > > very expensive and a connector might expose a long list of metadata > > keys. It was therefore intended to project the metadata if possible. I'm > > pretty sure that this worked before (at least when implementing > > SupportsProjectionPushDown). Maybe a bug was introduced when adding the > > Spec support. > > > > Regards, > > Timo > > > > > > On 23.08.21 08:24, Ingo Bürk wrote: > > > Hi Jingsong, > > > > > > thanks for your answer. Even if the source implements > > > SupportsProjectionPushDown, #applyProjections will never be called with > > > projections for metadata columns. For example, I have the following > test: > > > > > > @Test > > > def test(): Unit = { > > > val tableId = TestValuesTableFactory.registerData(Seq()) > > > > > > tEnv.createTemporaryTable("T", > TableDescriptor.forConnector("values") > > > .schema(Schema.newBuilder() > > > .column("f0", DataTypes.INT()) > > > .columnByMetadata("m1", DataTypes.STRING()) > > > .columnByMetadata("m2", DataTypes.STRING()) > > > .build()) > > > .option("data-id", tableId) > > > .option("bounded", "true") > > > .option("readable-metadata", "m1:STRING,m2:STRING") > > > .build()) > > > > > > tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList > > > } > > > > > > Regardless of whether I select only f0 or f0 + m1, > #applyReadableMetadata > > > is always called with m1 + m2, and #applyProjections only ever sees f0. > > So > > > as far as I can tell, the source has no way of knowing which metadata > > > columns are actually needed (under the projection), it always has to > > > produce metadata for all metadata columns declared in the table's > schema. > > > > > > In PushProjectIntoTableSourceScanRule I also haven't yet found anything > > > that would suggest that metadata are first projected and only then > pushed > > > to the source. I think the correct behavior should be to call > > > #applyReadableMetadata only after they have been considered in the > > > projection. > > > > > > > > > Best > > > Ingo > > > > > > > > > On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com> > > wrote: > > > > > >> Hi, > > >> > > >> I remember the projection only works with SupportsProjectionPushDown. > > >> > > >> You can take a look at > > >> `PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`. > > >> > > >> Will applyReadableMetadata again in the > > PushProjectIntoTableSourceScanRule. > > >> > > >> But there may be bug in > > >> PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown: > > >> > > >> if (!usedMetadataNames.isEmpty()) { > > >> sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames, > > >> newProducedType)); > > >> } > > >> > > >> If there is no meta column left, we should apply again, We should tell > > >> the source that there is no meta column left after projection. > > >> > > >> Best, > > >> Jingsong > > >> > > >> On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com> wrote: > > >>> > > >>> Hi everyone, > > >>> > > >>> according to the SupportsReadableMetadata interface, the planner is > > >>> supposed to project required metadata columns prior to applying them: > > >>> > > >>>> The planner will select required metadata columns (i.e. perform > > >>> projection push down) and will call applyReadableMetadata(List, > > DataType) > > >>> with a list of metadata keys. > > >>> > > >>> However, from my experiments it seems that this is not true: > regardless > > >> of > > >>> what columns I select from a table, #applyReadableMetadata always > seems > > >> to > > >>> be called with all metadata declared in the schema of the table. > > Metadata > > >>> columns are also excluded from > > >> SupportsProjectionPushDown#applyProjection, > > >>> so the source cannot perform the projection either. > > >>> > > >>> This is in Flink 1.13.2. Am I misreading the docs here or is this not > > >>> working as intended? > > >>> > > >>> > > >>> Best > > >>> Ingo > > >> > > >> > > >> > > >> -- > > >> Best, Jingsong Lee > > >> > > > > > > > >