lindong28 commented on a change in pull request #18397: URL: https://github.com/apache/flink/pull/18397#discussion_r790076382
########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSerializerWrapper.java ########## @@ -32,26 +32,46 @@ import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; +/** Wrapper for Kafka {@link Serializer}. */ class KafkaSerializerWrapper<IN> implements SerializationSchema<IN> { + private final Class<? extends Serializer<? super IN>> serializerClass; + // whether the serializer is for key or value + private final boolean isKey; private final Map<String, String> config; private final Function<? super IN, String> topicSelector; private transient Serializer<? super IN> serializer; KafkaSerializerWrapper( Class<? extends Serializer<? super IN>> serializerClass, + boolean isKey, Map<String, String> config, Function<? super IN, String> topicSelector) { this.serializerClass = checkNotNull(serializerClass); + this.isKey = isKey; this.config = checkNotNull(config); this.topicSelector = checkNotNull(topicSelector); } KafkaSerializerWrapper( Class<? extends Serializer<? super IN>> serializerClass, + Map<String, String> config, + Function<? super IN, String> topicSelector) { + this(serializerClass, false, config, topicSelector); + } + + KafkaSerializerWrapper( Review comment: nits: Previously we have two constructors here. Now we have 4 constructors due to the addition of `isKey` parameter. Would it be simpler to keep only two constructors and let caller always explicitly provides the `isKey` value? This probably also makes the `KafkaRecordSerializationSchemaBuilder` code more readable. ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializerWrapper.java ########## @@ -36,10 +36,15 @@ /** A package private class to wrap {@link Deserializer}. */ class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserializationSchema<T> { + private static final long serialVersionUID = 5409547407386004054L; + private static final Logger LOG = LoggerFactory.getLogger(KafkaValueOnlyDeserializerWrapper.class); + private final Class<? extends Deserializer<T>> deserializerClass; + // always be false since this Deserializer is only used for value. Review comment: nits: The comment seems to be outdated since this value could be true. And could we start the comment with an upper-case letter for consistency with other comments? ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilderTest.java ########## @@ -53,11 +54,15 @@ private static final String DEFAULT_TOPIC = "test"; + private static Map<String, ?> configurableConfiguration; Review comment: hmm... would it be simpler to let `ConfigurableStringSerializer` use the `configuration` below instead of creating this `configurableConfiguration`? ########## File path: flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java ########## @@ -108,24 +107,24 @@ default void open(DeserializationSchema.InitializationContext context) throws Ex */ static <V> KafkaRecordDeserializationSchema<V> valueOnly( Class<? extends Deserializer<V>> valueDeserializerClass) { - return new KafkaValueOnlyDeserializerWrapper<>( - valueDeserializerClass, Collections.emptyMap()); + return valueOnly(valueDeserializerClass, Collections.emptyMap()); } /** * Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}. * * @param valueDeserializerClass the deserializer class used to deserialize the value. - * @param config the configuration of the value deserializer, only valid when the deserializer - * is an implementation of {@code Configurable}. + * @param config the configuration of the value deserializer. If the deserializer is an Review comment: nits: the doc seems to miss the explanation of what is invoked when the deserializer is NOT an implementation of Configurable. Would it be slightly better to use something like this: ``` The configuration of the value deserializer. If the deserializer is an implementation of {@code Configurable}, {@link org.apache.kafka.common.Configurable#configure(Map)} will be invoked with the given config. Otherwise, {@link Deserializer#configure(Map, boolean)} will be invoked with the given config. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org