Savonitar commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3437485131
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
- public boolean supportsMetadataProjection() {
- return false;
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
}
@Override
- public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
- this.watermarkStrategy = watermarkStrategy;
+ public boolean supportsNestedProjection() {
+ return (keyDecodingFormat == null
Review Comment:
Hi @fqshopify .
Thanks for laying out both options clearly, and sorry for the late reply. I
think we're pretty close to finalizing this work and it would be a shame to
abandon it (I hope you still have capacity).
My vote is Option 1: keep the three-level enum and have
`supportsNestedProjection()` also check the format's own
`ProjectableDecodingFormat.supportsNestedProjection()`. The deciding factor for
me is the "lying format" case, which is the same reason we kept the three-level
enum in the first place.
As a result, with Option 1 the user's level becomes a "ceiling" rather than
a command: the planner is told **true** only when the user opted into **ALL**
and the format actually declares nested support.
I prototyped this locally with regression tests and it works cleanly (cases
that fail on the current code pass with the fix, and the full suite stays
green).
I'm not against Option 2, but it would likely require more changes to the
PR, whereas the current approach has already been reviewed/discussed/agreed.
WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]