fqshopify commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r2755949790
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,38 +337,42 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
public boolean supportsMetadataProjection() {
- return false;
+ throw new IllegalStateException(
+ "This should never be called as KafkaDynamicSource implements
the SupportsProjectionPushdown interface.");
}
@Override
public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
this.watermarkStrategy = watermarkStrategy;
}
+ @Override
+ public boolean supportsNestedProjection() {
+ return (keyDecodingFormat == null
+ || keyFormatProjectionPushdownLevel ==
FormatProjectionPushdownLevel.ALL)
+ && valueFormatProjectionPushdownLevel ==
FormatProjectionPushdownLevel.ALL;
+ }
+
+ @Override
+ public void applyProjection(final int[][] projectedFields, final DataType
producedDataType) {
+ this.projectedPhysicalFields = projectedFields;
Review Comment:
Good question! We don't need to update `producedDataType` in
`applyProjection` because the javadoc for `applyReadableMetadata` explicitly
addresses this scenario:
```java
/**
* ...
*
* <p>Note: Use the passed data type instead of {@link
ResolvedSchema#toPhysicalRowDataType()}
* for describing the final output data type when creating {@link
TypeInformation}. If the
* source implements {@link SupportsProjectionPushDown}, the projection
is already considered in
* the given output data type, use the {@code producedDataType} provided
by this method instead
* of the {@code producedDataType} provided by {@link
* SupportsProjectionPushDown#applyProjection(int[][], DataType)}.
*
* ...
*/
void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType);
```
So we're following the recommended approach - using the `producedDataType`
from `applyReadableMetadata` rather than the one from `applyProjection`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]