Hello All,
I have a pyflink script that connects to Pulsar and streams data from a topic which has a BytesSchema (of Pulsar Schema type). The producer that is producing these messages is a protobuf producer and is writing messages as byte[] (I am not setting any schema type on the producer and hence defaulting to BytesSchema of pulsar) . To deserialize the bytes from the topic, I used a SimpleStringSchema as the Deserializer in my script. This hit a deadend as the protobuf parser keeps failing as it is not able to parse the string as provided by the PulsarStream source. (Perhaps the utf-8 conversion from raw bytes is the issue here) I am now trying to use the Flink Type Info based approach to deserialize my stream. However I am getting an “EOFException”. The FlinkTypeInfo has been set to “Types.PRIMITIVE_ARRAY(Types.BYTE())” as the type information call to PulsarDeserializationSchema.flink_type_info(). Please find attached both the source code and the relevant exception log where the script is failing. Could you please provide any pointers to help me get over this issue ? Regards, Ananth
FlinkTypeInfoException.log.gz
Description: FlinkTypeInfoException.log.gz
test_pulsar.py.gz
Description: test_pulsar.py.gz