Savonitar commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3081213073
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
- public boolean supportsMetadataProjection() {
- return false;
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
}
@Override
- public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
- this.watermarkStrategy = watermarkStrategy;
+ public boolean supportsNestedProjection() {
+ return (keyDecodingFormat == null
Review Comment:
I have a question.
This method checks only the user's config (e.g.
`FormatProjectionPushdownLevel.ALL`), but never checks the format's own
`ProjectableDecodingFormat.supportsNestedProjection()`.
What if we have the following scenario:
1. A format that implements ProjectableDecodingFormat with
`supportsNestedProjection()` returning **false** (it supports top-level
projection but **not nested** , e.g. avro-confluent, or a custom format).
2. The user sets 'value.format-projection-pushdown-level' = 'ALL' in DDL.
3. The schema **has nested fields**.
Wouldn't this cause a runtime crash/incorrect results/other issues?
My understanding of the chain:
1. Schema has nested ROW fields and query selects a nested subfield
2. Planner calls `KafkaDynamicSource.supportsNestedProjection()`: returns
true (based on user config only)
3. Planner generates nested field paths (e.g., {1, 0}) and passes them to
`applyProjection() `
4. At runtime, `Decoder.create()` sees the format instance of
`ProjectableDecodingFormat` and pushdown is enabled -> takes the
projectInsideDeserializer path ->calls format.createRuntimeDecoder(ctx, type,
[[1, 0]])
5. The format receives a nested projection (int[] with length > 1) however,
format declared it cannot handle it.
Should `supportsNestedProjection()` also check the format's declared
capability?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]