Hi Wouter, Great to hear and thanks for the sharing!
Regards, Dian > 2021年6月8日 下午4:44,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道: > > Hi Dian, all, > > The way I resolved right now, is to write my own custom serializer which only > maps from bytes to bytes. See the code below: > public class KafkaBytesSerializer implements SerializationSchema<byte[]>, > DeserializationSchema<byte[]> { > > @Override > public byte[] deserialize(byte[] bytes) throws IOException { > return bytes; > } > > @Override > public boolean isEndOfStream(byte[] bytes) { > return false; > } > > @Override > public byte[] serialize(byte[] bytes) { > return bytes; > } > > @Override > public TypeInformation<byte[]> getProducedType() { > return TypeInformation.of(byte[].class); > } > } > > This code is packaged in a jar and uploaded through env.add_jars. That works > like a charm! > > Thanks for the help! > Wouter > > On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager <zorgdrag...@gmail.com > <mailto:zorgdrag...@gmail.com>> wrote: > Hi Dian, all, > > Thanks for your suggestion. Unfortunately, it does not seem to work. I get > the following exception: > > Caused by: java.lang.NegativeArraySizeException: -2147183315 > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) > at > org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31) > at > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269) > > To be more precise, the messages in my Kafka topic are pickled Python > objects. Maybe that is the reason for the exception, I also tried using > Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same > serializer because I get the same exception. > > Any suggestions? Thanks for your help! > > Regards, > Wouter > > On Fri, 4 Jun 2021 at 08:24, Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > Hi Wouter, > >> E org.apache.flink.api.python.shaded.py4j.Py4JException: >> Constructor >> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class >> org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class >> org.apache.flink.configuration.Configuration]) does not exist > > As the exception indicate, the constructor doesn’t exists. > > > > Could you try with the following: > > ``` > j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info() > j_type_serializer= > j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig()) > j_byte_string_schema = > gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, > j_type_serializer) > ``` > > Regards, > Dian > >> 2021年6月3日 下午8:51,Wouter Zorgdrager <zorgdrag...@gmail.com >> <mailto:zorgdrag...@gmail.com>> 写道: >> >> Hi all, >> >> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to >> directly work with the bytes from and to Kafka because I want to >> serialize/deserialize in my Python code rather than the JVM environment. >> Therefore, I can't use the SimpleStringSchema for (de)serialization (the >> messages aren't strings anyways). I've tried to create a >> TypeInformationSerializer with Types.BYTE(), see the code snippet below: >> >> class ByteSerializer(SerializationSchema, DeserializationSchema): >> def __init__(self, execution_environment): >> gate_way = get_gateway() >> >> j_byte_string_schema = >> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( >> Types.BYTE().get_java_type_info(), >> get_j_env_configuration(execution_environment), >> ) >> SerializationSchema.__init__(self, >> j_serialization_schema=j_byte_string_schema) >> DeserializationSchema.__init__( >> self, j_deserialization_schema=j_byte_string_schema >> ) >> The ByteSerializer is used like this: >> >> return FlinkKafkaConsumer( >> ["client_request", "internal"], >> ByteSerializer(self.env._j_stream_execution_environment), >> { >> "bootstrap.servers": "localhost:9092", >> "auto.offset.reset": "latest", >> "group.id <http://group.id/>": str(uuid.uuid4()), >> }, >> ) >> >> However, this does not seem to work. I think the error is thrown in the JVM >> environment, which makes it a bit hard to parse in my Python stack trace, >> but I think it boils down to this stacktrace part: >> >> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: >> Constructor >> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat >> java.base/java.lang.Thread.run(Thread.java:834)\\n' >> gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550> >> target_id = None >> name = >> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema' >> >> def get_return_value(answer, gateway_client, target_id=None, name=None): >> """Converts an answer received from the Java gateway into a Python >> object. >> >> For example, string representation of integers are converted to >> Python >> integer, string representation of objects are converted to JavaObject >> instances, etc. >> >> :param answer: the string returned by the Java gateway >> :param gateway_client: the gateway client used to communicate with >> the Java >> Gateway. Only necessary if the answer is a reference (e.g., >> object, >> list, map) >> :param target_id: the name of the object from which the answer comes >> from >> (e.g., *object1* in `object1.hello()`). Optional. >> :param name: the name of the member from which the answer comes from >> (e.g., *hello* in `object1.hello()`). Optional. >> """ >> if is_error(answer)[0]: >> if len(answer) > 1: >> type = answer[1] >> value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) >> if answer[1] == REFERENCE_TYPE: >> raise Py4JJavaError( >> "An error occurred while calling {0}{1}{2}.\n". >> format(target_id, ".", name), value) >> else: >> > raise Py4JError( >> "An error occurred while calling {0}{1}{2}. >> Trace:\n{3}\n". >> format(target_id, ".", name, value)) >> E py4j.protocol.Py4JError: An error occurred while calling >> None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. >> Trace: >> E org.apache.flink.api.python.shaded.py4j.Py4JException: >> Constructor >> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class >> org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class >> org.apache.flink.configuration.Configuration]) does not exist >> E at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179) >> E at >> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196) >> E at >> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237) >> E at >> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) >> E at >> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) >> E at >> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) >> E at java.base/java.lang.Thread.run(Thread.java:834) >> >> I hope you can help me out! >> >> Thanks in advance, >> Wouter >