fqshopify commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3467334214
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
- public boolean supportsMetadataProjection() {
- return false;
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
}
@Override
- public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
- this.watermarkStrategy = watermarkStrategy;
+ public boolean supportsNestedProjection() {
+ return (keyDecodingFormat == null
Review Comment:
Alright, I went with Option 2 (see updated PR description + last two
commits). Summary of how it resolves your original concern:
- The config is now a simple boolean per key/value:
`{key,value}.format.projection-pushdown.enabled` (default `false`). The old
`NONE/TOP_LEVEL/ALL` enum is gone, the user no longer determines this.
- Instead, when pushdown is enabled, the connector decides the actual level
internally based whether the format is instance of `ProjectableDecodingFormat`
and if `ProjectableDecodingFormat.supportsNestedProjection()`.
- For the "top-level only but the schema has nested fields" case you raised,
there's a new path (`projectTopLevelInsideDeserializerThenNestedAfter`): it
pushes only the top-level fields into the format and extracts the nested
sub-fields from the deserialized row afterward. So no nested path is ever
handed to a format that can't take one.
Overall, I think this is a win-win; it's both simpler for the user and also
more correct.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]