Efrat19 commented on code in PR #174:
URL:
https://github.com/apache/flink-connector-kafka/pull/174#discussion_r3184385367
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java:
##########
@@ -72,13 +74,15 @@
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.Stream;
/** A version-agnostic Kafka {@link ScanTableSource}. */
@Internal
public class KafkaDynamicSource
- implements ScanTableSource, SupportsReadingMetadata,
SupportsWatermarkPushDown {
+ implements ScanTableSource,
+ SupportsReadingMetadata,
+ SupportsWatermarkPushDown,
+ SupportsProjectionPushDown {
Review Comment:
I'm concerned about the default `KafkaDynamicSource` declaring itself
`SupportsProjectionPushDown` while not fully supporting it (i.e upsertMode,
incompatible format, or simply turned off)
From
[SupportsProjectionPushDown.java](https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsProjectionPushDown.java#L42C7-L43C31):
> By default, if this interface is not implemented, a projection is applied
in a subsequent operation after the source.
Given the feature is opt in, wouldn't it mean that by default workloads are
deprived from downstream optimizations?
##########
flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java:
##########
@@ -139,7 +142,7 @@ public void deserialize(ConsumerRecord<byte[], byte[]>
record, Collector<RowData
throws IOException {
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
- if (keyDeserialization == null && !hasMetadata) {
+ if (keyDeserialization == null && !hasMetadata && !hasValueProjection)
{
Review Comment:
Orojection might be needed even if `valueFormatProjectionPushdownLevel` is
set to `NONE` (i.e columns reordering) am I right?
So by default we are presenting another limitation on the shortcut path
Is there any way to avoid it given `deserialize` is the record level hotpath?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]