lindong28 commented on a change in pull request #18397:
URL: https://github.com/apache/flink/pull/18397#discussion_r790076382



##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java
##########
@@ -32,26 +32,46 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
+/** Wrapper for Kafka {@link Serializer}. */
 class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> {
+
     private final Class<? extends Serializer<? super IN>> serializerClass;
+    // whether the serializer is for key or value
+    private final boolean isKey;
     private final Map<String, String> config;
     private final Function<? super IN, String> topicSelector;
 
     private transient Serializer<? super IN> serializer;
 
     KafkaSerializerWrapper(
             Class<? extends Serializer<? super IN>> serializerClass,
+            boolean isKey,
             Map<String, String> config,
             Function<? super IN, String> topicSelector) {
         this.serializerClass = checkNotNull(serializerClass);
+        this.isKey = isKey;
         this.config = checkNotNull(config);
         this.topicSelector = checkNotNull(topicSelector);
     }
 
     KafkaSerializerWrapper(
             Class<? extends Serializer<? super IN>> serializerClass,
+            Map<String, String> config,
+            Function<? super IN, String> topicSelector) {
+        this(serializerClass, false, config, topicSelector);
+    }
+
+    KafkaSerializerWrapper(

Review comment:
       nits: Previously we have two constructors here. Now we have 4 
constructors due to the addition of `isKey` parameter. 
   
   Would it be simpler to keep only two constructors and let caller always 
explicitly provides the `isKey` value? This probably also makes the 
`KafkaRecordSerializationSchemaBuilder` code more readable.
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java
##########
@@ -36,10 +36,15 @@
 
 /** A package private class to wrap {@link Deserializer}. */
 class KafkaValueOnlyDeserializerWrapper<T> implements 
KafkaRecordDeserializationSchema<T> {
+
     private static final long serialVersionUID = 5409547407386004054L;
+
     private static final Logger LOG =
             LoggerFactory.getLogger(KafkaValueOnlyDeserializerWrapper.class);
+
     private final Class<? extends Deserializer<T>> deserializerClass;
+    // always be false since this Deserializer is only used for value.

Review comment:
       nits: The comment seems to be outdated since this value could be true.
   
   And could we start the comment with an upper-case letter for consistency 
with other comments?

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java
##########
@@ -53,11 +54,15 @@
 
     private static final String DEFAULT_TOPIC = "test";
 
+    private static Map<String, ?> configurableConfiguration;

Review comment:
       hmm... would it be simpler to let `ConfigurableStringSerializer` use the 
`configuration` below instead of creating this `configurableConfiguration`?

##########
File path: 
flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java
##########
@@ -108,24 +107,24 @@ default void 
open(DeserializationSchema.InitializationContext context) throws Ex
      */
     static <V> KafkaRecordDeserializationSchema<V> valueOnly(
             Class<? extends Deserializer<V>> valueDeserializerClass) {
-        return new KafkaValueOnlyDeserializerWrapper<>(
-                valueDeserializerClass, Collections.emptyMap());
+        return valueOnly(valueDeserializerClass, Collections.emptyMap());
     }
 
     /**
      * Wraps a Kafka {@link Deserializer} to a {@link 
KafkaRecordDeserializationSchema}.
      *
      * @param valueDeserializerClass the deserializer class used to 
deserialize the value.
-     * @param config the configuration of the value deserializer, only valid 
when the deserializer
-     *     is an implementation of {@code Configurable}.
+     * @param config the configuration of the value deserializer. If the 
deserializer is an

Review comment:
       nits: the doc seems to miss the explanation of what is invoked when the 
deserializer is NOT an implementation of Configurable.
   
   Would it be slightly better to use something like this:
   ```
   The configuration of the value deserializer. If the deserializer is an 
   implementation of {@code Configurable}, 
   {@link org.apache.kafka.common.Configurable#configure(Map)} 
   will be invoked with the given config. Otherwise, {@link 
Deserializer#configure(Map, boolean)} 
   will be invoked with the given config.
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to