twalthr commented on a change in pull request #12320:
URL: https://github.com/apache/flink/pull/12320#discussion_r432912683



##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSourceBase.java
##########
@@ -85,7 +85,7 @@
         * @param outputDataType         Source produced data type
         * @param topic                  Kafka topic to consume.
         * @param properties             Properties for the Kafka consumer.
-        * @param scanFormat             Scan format for decoding records from 
Kafka.
+        * @param decodingFormat             Scan format for decoding records 
from Kafka.

Review comment:
       fix wrong indention and remove "Scan" from Javadoc?

##########
File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSinkBase.java
##########
@@ -63,23 +63,23 @@ protected KafkaDynamicSinkBase(
                        String topic,
                        Properties properties,
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat) {
                this.consumedDataType = 
Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be 
null.");
                this.topic = Preconditions.checkNotNull(topic, "Topic must not 
be null.");
                this.properties = Preconditions.checkNotNull(properties, 
"Properties must not be null.");
                this.partitioner = Preconditions.checkNotNull(partitioner, 
"Partitioner must not be null.");
-               this.sinkFormat = Preconditions.checkNotNull(sinkFormat, "Sink 
format must not be null.");
+               this.encodingFormat = 
Preconditions.checkNotNull(encodingFormat, "Sink format must not be null.");

Review comment:
       update error message?

##########
File path: 
flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/table/Kafka011DynamicTableFactoryTest.java
##########
@@ -82,12 +82,12 @@ protected KafkaDynamicSinkBase getExpectedSink(
                        String topic,
                        Properties properties,
                        Optional<FlinkKafkaPartitioner<RowData>> partitioner,
-                       SinkFormat<SerializationSchema<RowData>> sinkFormat) {
+                       EncodingFormat<SerializationSchema<RowData>> 
encodingFormat) {
                return new Kafka011DynamicSink(
                                consumedDataType,
                                topic,
                                properties,
                                partitioner,
-                               sinkFormat);
+                       encodingFormat);

Review comment:
       fix wrong indention here and at other locations

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/Format.java
##########
@@ -34,15 +34,15 @@
  *
  * <p>Formats can be distinguished along two dimensions:
  * <ul>
- *     <li>Context in which the format is applied (e.g. {@link 
ScanTableSource} or {@link DynamicTableSink}).
+ *     <li>Context in which the format is applied (e.g. {@link 
DynamicTableSource} or {@link DynamicTableSink}).

Review comment:
       we can remove the `e.g.` it is only for those two locations now

##########
File path: 
flink-table/flink-table-common/src/test/java/org/apache/flink/table/factories/TestDynamicTableFactory.java
##########
@@ -134,13 +134,14 @@ public String factoryIdentifier() {
        public static class DynamicTableSourceMock implements ScanTableSource {
 
                public final String target;
-               public final @Nullable 
ScanFormat<DeserializationSchema<RowData>> sourceKeyFormat;
-               public final ScanFormat<DeserializationSchema<RowData>> 
sourceValueFormat;
+               public final @Nullable
+               DecodingFormat<DeserializationSchema<RowData>> sourceKeyFormat;

Review comment:
       fix formatting here and below

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/format/DecodingFormat.java
##########
@@ -19,19 +19,19 @@
 package org.apache.flink.table.connector.format;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.connector.source.ScanTableSource;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.types.DataType;
 
 /**
- * A {@link Format} for a {@link ScanTableSource}.
+ * A {@link Format} for a {@link DynamicTableSource} to reading rows.

Review comment:
       nit: `for reading rows`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to