Vlad Olevsky created KAFKA-8879:
-----------------------------------
Summary: GlobalStateUpdateTask uses wrong javaType to deserialize
value
Key: KAFKA-8879
URL: https://issues.apache.org/jira/browse/KAFKA-8879
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.3.0
Reporter: Vlad Olevsky
We read messages from input topic, transform messages
(ChannelConfigNew->ChannelConfig) and send it to another topic:
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
.transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig,
prometheusCounter, channelConfigPostDataHelper))
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));
{code}
where ChannelConfigProcessor (only essential part is shown)
{code:java}
public class ChannelConfigProcessor implements Transformer<String,
ChannelConfigNew, KeyValue<String, ChannelConfig>> {
public KeyValue<String, ChannelConfig> transform(String ccid, ChannelConfigNew
channelConfigNew) {
return new KeyValue<>(ccid, convert(channelConfigNew));
}
private ChannelConfig convert(ChannelConfigNew channelConfigNew){
...
}
}
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored
in the headers of the message sent to another topic. Input type already
presented in headers when serialization is called in
{code:java}
org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final
String topic,
final K key,
final V value,
final Headers headers,
final Integer partition,
final Long timestamp,
final Serializer<K> keySerializer,
final Serializer<V> valueSerializer){
final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
}
{code}
output type is added to headers inside valueSerializer.serialize() method:
org.springframework.kafka.support.serializer.JsonSerializer
{code:java}
@Override
public byte[] serialize(String topic, Headers headers, T data) {
if (data == null) {
return null;
}
if (this.addTypeInfo && headers != null) {
this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()),
headers);
}
return serialize(topic, data);
}
{code}
On other side we have GlobalKTable with processor that retrieves first
(ChannelConfigNew - wrong one) javaType from header and try to deserialize
data using this type:
{code:java}
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
public void update(final ConsumerRecord<byte[], byte[]> record) {
...
final ConsumerRecord<Object, Object> deserialized =
sourceNodeAndDeserializer.deserialize(processorContext, record);
..
}
{code}
sourceNodeAndDeserializer.deserialize calls
{code:java}
org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
{
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(),
rawRecord.value()), rawRecord.headers());
}
eventually reaches
org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
topic, Headers headers, byte[] data){
if (data == null) {
return null;
}
JavaType javaType = this.typeMapper.toJavaType(headers);
if (javaType == null) {
Assert.state(this.targetType != null, "No type information in headers and no
default type provided");
return deserialize(topic, data);
}
else {
try {
return this.objectMapper.readerFor(javaType).readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" +
Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}
}
{code}
JavaType javaType = this.typeMapper.toJavaType(headers) calls extract first
javaType -which is not one that should be used to deserialize the object (it
gets ChannelConfigNew rather than ChannelConfig). As result the object is not
retrieved properly - all fields are null.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)