fqshopify commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3443409382
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -300,36 +337,34 @@ public Map<String, DataType> listReadableMetadata() {
@Override
public void applyReadableMetadata(List<String> metadataKeys, DataType
producedDataType) {
- // separate connector and format metadata
- final List<String> formatMetadataKeys =
- metadataKeys.stream()
- .filter(k -> k.startsWith(VALUE_METADATA_PREFIX))
- .collect(Collectors.toList());
- final List<String> connectorMetadataKeys = new
ArrayList<>(metadataKeys);
- connectorMetadataKeys.removeAll(formatMetadataKeys);
-
- // push down format metadata
- final Map<String, DataType> formatMetadata =
valueDecodingFormat.listReadableMetadata();
- if (formatMetadata.size() > 0) {
- final List<String> requestedFormatMetadataKeys =
- formatMetadataKeys.stream()
- .map(k ->
k.substring(VALUE_METADATA_PREFIX.length()))
- .collect(Collectors.toList());
-
valueDecodingFormat.applyReadableMetadata(requestedFormatMetadataKeys);
+ this.valueFormatMetadataKeys = new ArrayList<>();
+ this.metadataKeys = new ArrayList<>();
+ for (final String key : metadataKeys) {
+ if (key.startsWith(VALUE_METADATA_PREFIX)) {
+ final String formatMetadataKey =
key.substring(VALUE_METADATA_PREFIX.length());
+ this.valueFormatMetadataKeys.add(formatMetadataKey);
+ } else {
+ this.metadataKeys.add(key);
+ }
}
-
- this.metadataKeys = connectorMetadataKeys;
this.producedDataType = producedDataType;
}
@Override
- public boolean supportsMetadataProjection() {
- return false;
+ public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
+ this.watermarkStrategy = watermarkStrategy;
}
@Override
- public void applyWatermark(WatermarkStrategy<RowData> watermarkStrategy) {
- this.watermarkStrategy = watermarkStrategy;
+ public boolean supportsNestedProjection() {
+ return (keyDecodingFormat == null
Review Comment:
> I think we're pretty close to finalizing this work
I agree, and thank for you attention to this PR, it is much appreciated!
> it would be a shame to abandon it (I hope you still have capacity).
Ha, definitely can't abandon this, we're still using it internally XD I
should have some capacity next week to continue with this next week, let me try
to resolve this comment then.
> I'm not against Option 2, but it would likely require more changes to the
PR, whereas the current approach has already been reviewed/discussed/agreed.
TBH my current thinking is that, unless I can come up with a concrete
objection, I'd actually prefer Option 2. I've prototyped this locally but I
want to do some more testing internally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]