fqshopify commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3467334214


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
-    public boolean supportsMetadataProjection() {
-        return false;
+    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+        this.watermarkStrategy = watermarkStrategy;
     }
 
     @Override
-    public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
-        this.watermarkStrategy = watermarkStrategy;
+    public boolean supportsNestedProjection() {
+        return (keyDecodingFormat == null

Review Comment:
   @Savonitar Alright, I went with Option 2 (see updated PR description + last 
two commits). Summary of how it resolves your original concern:
   - The config is now a simple boolean per key/value: 
`{key,value}.format.projection-pushdown.enabled` (default `false`). The old 
`NONE/TOP_LEVEL/ALL` enum is gone, the user no longer determines this. 
   - Instead, when pushdown is enabled, the connector decides the actual level 
internally based whether the format is instance of `ProjectableDecodingFormat` 
and if `ProjectableDecodingFormat.supportsNestedProjection()`.
   - For the "top-level only but the schema has nested fields" case you raised, 
there's a new path (`projectTopLevelInsideDeserializerThenNestedAfter`): it 
pushes only the top-level fields into the format and extracts the nested 
sub-fields from the deserialized row afterward. So no nested path is ever 
handed to a format that can't take one.
   
   Overall, I think this is a win-win; it's both simpler for the user and also 
more correct. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to