fqshopify commented on code in PR #174:
URL: 
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2755949790


##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() {
 
     @Override
     public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {
-        // separate connector and format metadata
-        final List<String> formatMetadataKeys =
-                metadataKeys.stream()
-                        .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
-                        .collect(Collectors.toList());
-        final List<String> connectorMetadataKeys = new 
ArrayList<>(metadataKeys);
-        connectorMetadataKeys.removeAll(formatMetadataKeys);
-
-        // push down format metadata
-        final Map<String, DataType> formatMetadata = 
valueDecodingFormat.listReadableMetadata();
-        if (formatMetadata.size() > 0) {
-            final List<String> requestedFormatMetadataKeys =
-                    formatMetadataKeys.stream()
-                            .map(k -> 
k.substring(VALUE_METADATA_PREFIX.length()))
-                            .collect(Collectors.toList());
-            
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+        this.valueFormatMetadataKeys = new ArrayList<>();
+        this.metadataKeys = new ArrayList<>();
+        for (final String key : metadataKeys) {
+            if (key.startsWith(VALUE_METADATA_PREFIX)) {
+                final String formatMetadataKey = 
key.substring(VALUE_METADATA_PREFIX.length());
+                this.valueFormatMetadataKeys.add(formatMetadataKey);
+            } else {
+                this.metadataKeys.add(key);
+            }
         }
-
-        this.metadataKeys = connectorMetadataKeys;
         this.producedDataType = producedDataType;
     }
 
     @Override
     public boolean supportsMetadataProjection() {
-        return false;
+        throw new IllegalStateException(
+                "This should never be called as KafkaDynamicSource implements 
the SupportsProjectionPushdown interface.");
     }
 
     @Override
     public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
         this.watermarkStrategy = watermarkStrategy;
     }
 
+    @Override
+    public boolean supportsNestedProjection() {
+        return (keyDecodingFormat == null
+                        || keyFormatProjectionPushdownLevel == 
FormatProjectionPushdownLevel.ALL)
+                && valueFormatProjectionPushdownLevel == 
FormatProjectionPushdownLevel.ALL;
+    }
+
+    @Override
+    public void applyProjection(final int[][] projectedFields, final DataType 
producedDataType) {
+        this.projectedPhysicalFields = projectedFields;

Review Comment:
   Good question! We don't need to update `producedDataType` in 
`applyProjection` because the javadoc for `applyReadableMetadata` explicitly 
addresses this scenario:
   
   ```java
       /**
        * ...
        *
        * <p>Note: Use the passed data type instead of {@link 
ResolvedSchema#toPhysicalRowDataType()}
        * for describing the final output data type when creating {@link 
TypeInformation}. If the
        * source implements {@link SupportsProjectionPushDown}, the projection 
is already considered in
        * the given output data type, use the {@code producedDataType} provided 
by this method instead
        * of the {@code producedDataType} provided by {@link
        * SupportsProjectionPushDown#applyProjection(int[][], DataType)}.
        * 
        * ...
        */
       void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType);
   ```
   
   So we're following the recommended approach i.e. using the 
`producedDataType` from `applyReadableMetadata` rather than the one from 
`applyProjection`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to