Hi all, I have a PyFlink job connected to a KafkaConsumer and Producer. I want to directly work with the bytes from and to Kafka because I want to serialize/deserialize in my Python code rather than the JVM environment. Therefore, I can't use the SimpleStringSchema for (de)serialization (the messages aren't strings anyways). I've tried to create a TypeInformationSerializer with Types.BYTE(), see the code snippet below:
class ByteSerializer(SerializationSchema, DeserializationSchema): def __init__(self, execution_environment): gate_way = get_gateway() j_byte_string_schema = gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema( Types.BYTE().get_java_type_info(), get_j_env_configuration(execution_environment), ) SerializationSchema.__init__(self, j_serialization_schema=j_byte_string_schema) DeserializationSchema.__init__( self, j_deserialization_schema=j_byte_string_schema )The ByteSerializer is used like this: return FlinkKafkaConsumer( ["client_request", "internal"], ByteSerializer(self.env._j_stream_execution_environment), { "bootstrap.servers": "localhost:9092", "auto.offset.reset": "latest", "group.id": str(uuid.uuid4()), }, ) However, this does not seem to work. I think the error is thrown in the JVM environment, which makes it a bit hard to parse in my Python stack trace, but I think it boils down to this stacktrace part: answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat java.base/java.lang.Thread.run(Thread.java:834)\\n' gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550> target_id = None name = 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema' def get_return_value(answer, gateway_client, target_id=None, name=None): """Converts an answer received from the Java gateway into a Python object. For example, string representation of integers are converted to Python integer, string representation of objects are converted to JavaObject instances, etc. :param answer: the string returned by the Java gateway :param gateway_client: the gateway client used to communicate with the Java Gateway. Only necessary if the answer is a reference (e.g., object, list, map) :param target_id: the name of the object from which the answer comes from (e.g., *object1* in `object1.hello()`). Optional. :param name: the name of the member from which the answer comes from (e.g., *hello* in `object1.hello()`). Optional. """ if is_error(answer)[0]: if len(answer) > 1: type = answer[1] value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) if answer[1] == REFERENCE_TYPE: raise Py4JJavaError( "An error occurred while calling {0}{1}{2}.\n". format(target_id, ".", name), value) else: > raise Py4JError( "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". format(target_id, ".", name, value)) E py4j.protocol.Py4JError: An error occurred while calling None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema. Trace: E org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class org.apache.flink.configuration.Configuration]) does not exist E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179) E at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196) E at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237) E at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) E at org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) E at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) E at java.base/java.lang.Thread.run(Thread.java:834) I hope you can help me out! Thanks in advance, Wouter