fqshopify commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3124628907


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
-    public boolean supportsMetadataProjection() {
-        return false;
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
     }
 
     @Override
-    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        this.watermarkStrategy = watermarkStrategy;
+    public boolean supportsNestedProjection() {
+        return (keyDecodingFormat == null

Review Comment:
   Haven't forgotten about this, still thinking it through!
   
   Debating between two options:
   
   - **Option 1**: Keep the per-format level config, but have 
`supportsNestedProjection()` also check the format's own 
`ProjectableDecodingFormat.supportsNestedProjection()` to make sure they don't 
conflict
   
   - **Option 2** (which you floated 
[here](https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2746744072)
 previously): replace the per-format enum with a single 
`'projection-pushdown.enabled' = 'true'` flag, and let the format internally 
decide which projection level (top-level or nested) it can use. Simpler API and 
still gives users the ability to turn off projection pushdown if the format has 
bugs. Trying to remember if there was any specific reason why I didnt do this 
in the past or if I simply overlooked the existence of the 
`ProjectableDecodingFormat.supportsNestedProjection` API. 



##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
-    public boolean supportsMetadataProjection() {
-        return false;
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
     }
 
     @Override
-    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        this.watermarkStrategy = watermarkStrategy;
+    public boolean supportsNestedProjection() {
+        return (keyDecodingFormat == null

Review Comment:
   Haven't forgotten about this, still thinking it through!
   
   Debating between two options:
   
   - **Option 1**: Keep the per-format level config, but have 
`supportsNestedProjection()` also check the format's own 
`ProjectableDecodingFormat.supportsNestedProjection()` to make sure they don't 
conflict
   
   - **Option 2** (which you floated 
[here](https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2746744072)
 previously): replace the per-format enum with a single 
`'projection-pushdown.enabled' = 'true'` flag, and let the format internally 
decide which projection level (top-level or nested) it can use. Simpler API and 
still gives users the ability to turn off projection pushdown if the format has 
projection pushdown bugs. Trying to remember if there was any specific reason 
why I didnt do this in the past or if I simply overlooked the existence of the 
`ProjectableDecodingFormat.supportsNestedProjection` API. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to