twalthr commented on a change in pull request #12320: URL: https://github.com/apache/flink/pull/12320#discussion_r432912683
########## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java ########## @@ -85,7 +85,7 @@ * @param outputDataType Source produced data type * @param topic Kafka topic to consume. * @param properties Properties for the Kafka consumer. - * @param scanFormat Scan format for decoding records from Kafka. + * @param decodingFormat Scan format for decoding records from Kafka. Review comment: fix wrong indention and remove "Scan" from Javadoc? ########## File path: flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java ########## @@ -63,23 +63,23 @@ protected KafkaDynamicSinkBase( String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, - SinkFormat<SerializationSchema<RowData>> sinkFormat) { + EncodingFormat<SerializationSchema<RowData>> encodingFormat) { this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null."); this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); this.properties = Preconditions.checkNotNull(properties, "Properties must not be null."); this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null."); - this.sinkFormat = Preconditions.checkNotNull(sinkFormat, "Sink format must not be null."); + this.encodingFormat = Preconditions.checkNotNull(encodingFormat, "Sink format must not be null."); Review comment: update error message? ########## File path: flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java ########## @@ -82,12 +82,12 @@ protected KafkaDynamicSinkBase getExpectedSink( String topic, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> partitioner, - SinkFormat<SerializationSchema<RowData>> sinkFormat) { + EncodingFormat<SerializationSchema<RowData>> encodingFormat) { return new Kafka011DynamicSink( consumedDataType, topic, properties, partitioner, - sinkFormat); + encodingFormat); Review comment: fix wrong indention here and at other locations ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java ########## @@ -34,15 +34,15 @@ * * <p>Formats can be distinguished along two dimensions: * <ul> - * <li>Context in which the format is applied (e.g. {@link ScanTableSource} or {@link DynamicTableSink}). + * <li>Context in which the format is applied (e.g. {@link DynamicTableSource} or {@link DynamicTableSink}). Review comment: we can remove the `e.g.` it is only for those two locations now ########## File path: flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java ########## @@ -134,13 +134,14 @@ public String factoryIdentifier() { public static class DynamicTableSourceMock implements ScanTableSource { public final String target; - public final @Nullable ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat; - public final ScanFormat<DeserializationSchema<RowData>> sourceValueFormat; + public final @Nullable + DecodingFormat<DeserializationSchema<RowData>> sourceKeyFormat; Review comment: fix formatting here and below ########## File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java ########## @@ -19,19 +19,19 @@ package org.apache.flink.table.connector.format; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.types.DataType; /** - * A {@link Format} for a {@link ScanTableSource}. + * A {@link Format} for a {@link DynamicTableSource} to reading rows. Review comment: nit: `for reading rows` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org