Hi everyone,
this sounds definitely like a bug to me. Computing metadata might be
very expensive and a connector might expose a long list of metadata
keys. It was therefore intended to project the metadata if possible. I'm
pretty sure that this worked before (at least when implementing
SupportsProjectionPushDown). Maybe a bug was introduced when adding the
Spec support.
Regards,
Timo
On 23.08.21 08:24, Ingo Bürk wrote:
Hi Jingsong,
thanks for your answer. Even if the source implements
SupportsProjectionPushDown, #applyProjections will never be called with
projections for metadata columns. For example, I have the following test:
@Test
def test(): Unit = {
val tableId = TestValuesTableFactory.registerData(Seq())
tEnv.createTemporaryTable("T", TableDescriptor.forConnector("values")
.schema(Schema.newBuilder()
.column("f0", DataTypes.INT())
.columnByMetadata("m1", DataTypes.STRING())
.columnByMetadata("m2", DataTypes.STRING())
.build())
.option("data-id", tableId)
.option("bounded", "true")
.option("readable-metadata", "m1:STRING,m2:STRING")
.build())
tEnv.sqlQuery("SELECT f0, m1 FROM T").execute().collect().toList
}
Regardless of whether I select only f0 or f0 + m1, #applyReadableMetadata
is always called with m1 + m2, and #applyProjections only ever sees f0. So
as far as I can tell, the source has no way of knowing which metadata
columns are actually needed (under the projection), it always has to
produce metadata for all metadata columns declared in the table's schema.
In PushProjectIntoTableSourceScanRule I also haven't yet found anything
that would suggest that metadata are first projected and only then pushed
to the source. I think the correct behavior should be to call
#applyReadableMetadata only after they have been considered in the
projection.
Best
Ingo
On Mon, Aug 23, 2021 at 5:05 AM Jingsong Li <jingsongl...@gmail.com> wrote:
Hi,
I remember the projection only works with SupportsProjectionPushDown.
You can take a look at
`PushProjectIntoTableSourceScanRuleTest.testNestProjectWithMetadata`.
Will applyReadableMetadata again in the PushProjectIntoTableSourceScanRule.
But there may be bug in
PushProjectIntoTableSourceScanRule.applyPhysicalAndMetadataPushDown:
if (!usedMetadataNames.isEmpty()) {
sourceAbilitySpecs.add(new ReadingMetadataSpec(usedMetadataNames,
newProducedType));
}
If there is no meta column left, we should apply again, We should tell
the source that there is no meta column left after projection.
Best,
Jingsong
On Fri, Aug 20, 2021 at 7:56 PM Ingo Bürk <i...@ververica.com> wrote:
Hi everyone,
according to the SupportsReadableMetadata interface, the planner is
supposed to project required metadata columns prior to applying them:
The planner will select required metadata columns (i.e. perform
projection push down) and will call applyReadableMetadata(List, DataType)
with a list of metadata keys.
However, from my experiments it seems that this is not true: regardless
of
what columns I select from a table, #applyReadableMetadata always seems
to
be called with all metadata declared in the schema of the table. Metadata
columns are also excluded from
SupportsProjectionPushDown#applyProjection,
so the source cannot perform the projection either.
This is in Flink 1.13.2. Am I misreading the docs here or is this not
working as intended?
Best
Ingo
--
Best, Jingsong Lee