Savonitar commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3081213073


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
-    public boolean supportsMetadataProjection() {
-        return false;
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
     }
 
     @Override
-    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        this.watermarkStrategy = watermarkStrategy;
+    public boolean supportsNestedProjection() {
+        return (keyDecodingFormat == null

Review Comment:
   I have a question. 
   This method checks only the user's config (e.g. 
`FormatProjectionPushdownLevel.ALL`), but never checks the format's own 
`ProjectableDecodingFormat.supportsNestedProjection()`. 
   
   What if we have the following scenario: 
   1. A format that implements ProjectableDecodingFormat with 
`supportsNestedProjection()` returning **false** (it supports top-level 
projection but **not nested** , e.g. avro-confluent, or a custom format). 
   2. The user sets 'value.format-projection-pushdown-level' = 'ALL' in DDL. 
   3. The schema **has nested fields**.
   
   Wouldn't this cause a runtime crash/incorrect results/other issues? 
   My understanding of the chain: 
   1. Schema has nested ROW fields and query selects a nested subfield
   2. Planner calls `KafkaDynamicSource.supportsNestedProjection()`: returns 
true (based on user config only) 
   3. Planner generates nested field paths (e.g., {1, 0}) and passes them to 
`applyProjection() `
   4. At runtime, `Decoder.create()` sees the format instance of 
`ProjectableDecodingFormat` and pushdown is enabled -> takes the 
projectInsideDeserializer path ->calls format.createRuntimeDecoder(ctx, type, 
[[1, 0]]) 
   5. The format receives a nested projection (int[] with length > 1) however, 
format declared it cannot handle  it.
   
   Should `supportsNestedProjection()` also check the format's declared 
capability? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to