Hi Wouter, > E org.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class > org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class > org.apache.flink.configuration.Configuration]) does not exist
As the exception indicate, the constructor doesn’t exists. Could you try with the following: ``` j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info() j_type_serializer= j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig()) j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info, j_type_serializer) ``` Regards, Dian > 2021年6月3日 下午8:51,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道: > > Hi all, > > I have a PyFlink job connected to a KafkaConsumer and Producer. I want to > directly work with the bytes from and to Kafka because I want to > serialize/deserialize in my Python code rather than the JVM environment. > Therefore, I can't use the SimpleStringSchema for (de)serialization (the > messages aren't strings anyways). I've tried to create a > TypeInformationSerializer with Types.BYTE(), see the code snippet below: > > class ByteSerializer(SerializationSchema, DeserializationSchema): > def __init__(self, execution_environment): > gate_way = get_gateway() > > j_byte_string_schema = > gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( > Types.BYTE().get_java_type_info(), > get_j_env_configuration(execution_environment), > ) > SerializationSchema.__init__(self, > j_serialization_schema=j_byte_string_schema) > DeserializationSchema.__init__( > self, j_deserialization_schema=j_byte_string_schema > ) > The ByteSerializer is used like this: > > return FlinkKafkaConsumer( > ["client_request", "internal"], > ByteSerializer(self.env._j_stream_execution_environment), > { > "bootstrap.servers": "localhost:9092", > "auto.offset.reset": "latest", > "group.id <http://group.id/>": str(uuid.uuid4()), > }, > ) > > However, this does not seem to work. I think the error is thrown in the JVM > environment, which makes it a bit hard to parse in my Python stack trace, > but I think it boils down to this stacktrace part: > > answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat > java.base/java.lang.Thread.run(Thread.java:834)\\n' > gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550> > target_id = None > name = > 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema' > > def get_return_value(answer, gateway_client, target_id=None, name=None): > """Converts an answer received from the Java gateway into a Python > object. > > For example, string representation of integers are converted to Python > integer, string representation of objects are converted to JavaObject > instances, etc. > > :param answer: the string returned by the Java gateway > :param gateway_client: the gateway client used to communicate with > the Java > Gateway. Only necessary if the answer is a reference (e.g., > object, > list, map) > :param target_id: the name of the object from which the answer comes > from > (e.g., *object1* in `object1.hello()`). Optional. > :param name: the name of the member from which the answer comes from > (e.g., *hello* in `object1.hello()`). Optional. > """ > if is_error(answer)[0]: > if len(answer) > 1: > type = answer[1] > value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) > if answer[1] == REFERENCE_TYPE: > raise Py4JJavaError( > "An error occurred while calling {0}{1}{2}.\n". > format(target_id, ".", name), value) > else: > > raise Py4JError( > "An error occurred while calling {0}{1}{2}. > Trace:\n{3}\n". > format(target_id, ".", name, value)) > E py4j.protocol.Py4JError: An error occurred while calling > None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. > Trace: > E org.apache.flink.api.python.shaded.py4j.Py4JException: > Constructor > org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class > org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class > org.apache.flink.configuration.Configuration]) does not exist > E at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179) > E at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196) > E at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237) > E at > org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) > E at > org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) > E at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > E at java.base/java.lang.Thread.run(Thread.java:834) > > I hope you can help me out! > > Thanks in advance, > Wouter