Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Dian Fu
Hi Wouter, Great to hear and thanks for the sharing! Regards, Dian > 2021年6月8日 下午4:44,Wouter Zorgdrager 写道: > > Hi Dian, all, > > The way I resolved right now, is to write my own custom serializer which only > maps from bytes to bytes. See the code below: > public class KafkaBytesSerializer

Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Wouter Zorgdrager
Hi Dian, all, The way I resolved right now, is to write my own custom serializer which only maps from bytes to bytes. See the code below: public class KafkaBytesSerializer implements SerializationSchema, DeserializationSchema { @Override public byte[] deserialize(byte[] bytes) throws

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Wouter Zorgdrager
Hi Dian, all, Thanks for your suggestion. Unfortunately, it does not seem to work. I get the following exception: Caused by: java.lang.NegativeArraySizeException: -2147183315 at

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Dian Fu
Hi Wouter, > E org.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class > org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class >

ByteSerializationSchema in PyFlink

2021-06-03 Thread Wouter Zorgdrager
Hi all, I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the