Ran Tao created FLINK-30935:
-------------------------------
Summary: Add KafkaSerializer deserialize check when using
SimpleVersionedSerializer
Key: FLINK-30935
URL: https://issues.apache.org/jira/browse/FLINK-30935
Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Affects Versions: 1.16.1
Reporter: Ran Tao
{code:java}
@Override
public int getVersion() {
return CURRENT_VERSION;
}
@Override
public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws
IOException {
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
String topic = in.readUTF();
int partition = in.readInt();
long offset = in.readLong();
long stoppingOffset = in.readLong();
return new KafkaPartitionSplit(
new TopicPartition(topic, partition), offset, stoppingOffset);
}
} {code}
Current kafka serializers do not deal with version check. I think we can add it
like many other connectors in case of incompatible or corrupt state.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)