Hello All,

I have a pyflink script that connects to Pulsar and streams data from a topic 
which has a BytesSchema (of Pulsar Schema type). The producer that is producing 
these messages is a protobuf producer and is writing messages as byte[] (I am 
not setting any schema type on the producer and hence defaulting to BytesSchema 
of pulsar) .

To deserialize the bytes from the topic, I used a SimpleStringSchema as the 
Deserializer in my script. This hit a deadend as the protobuf parser keeps 
failing as it is not able to parse the string as provided by the PulsarStream 
source. (Perhaps the utf-8 conversion from raw bytes is the issue here)

I am now trying to use the Flink Type Info based approach to deserialize my 
stream. However I am getting an “EOFException”. The FlinkTypeInfo has been set 
to “Types.PRIMITIVE_ARRAY(Types.BYTE())” as the type  information call to 
PulsarDeserializationSchema.flink_type_info().

Please find attached both the source code and the relevant exception log where 
the script is failing.

Could you please provide any pointers to help me get over this issue ?

Regards,
Ananth

Attachment: FlinkTypeInfoException.log.gz
Description: FlinkTypeInfoException.log.gz

Attachment: test_pulsar.py.gz
Description: test_pulsar.py.gz

Reply via email to