Hi Wouter,

> E                   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.



Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= 
j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = 
gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
 j_type_serializer)
```

Regards,
Dian

> 2021年6月3日 下午8:51,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道:
> 
> Hi all,
> 
> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to 
> directly work with the bytes from and to Kafka because I want to 
> serialize/deserialize in my Python code rather than the JVM environment. 
> Therefore, I can't use the SimpleStringSchema for (de)serialization (the 
> messages aren't strings anyways). I've tried to create a 
> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
> 
> class ByteSerializer(SerializationSchema, DeserializationSchema):
>     def __init__(self, execution_environment):
>         gate_way = get_gateway()
> 
>         j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>             Types.BYTE().get_java_type_info(),
>             get_j_env_configuration(execution_environment),
>         )
>         SerializationSchema.__init__(self, 
> j_serialization_schema=j_byte_string_schema)
>         DeserializationSchema.__init__(
>             self, j_deserialization_schema=j_byte_string_schema
>         )
> The ByteSerializer is used like this:
> 
> return FlinkKafkaConsumer(
>             ["client_request", "internal"],
>             ByteSerializer(self.env._j_stream_execution_environment),
>             {
>                 "bootstrap.servers": "localhost:9092",
>                 "auto.offset.reset": "latest",
>                 "group.id <http://group.id/>": str(uuid.uuid4()),
>             },
>         )
> 
> However, this does not seem to work. I think the error is thrown in the JVM 
> environment, which makes it a bit hard to parse in my Python stack trace,
> but I think it boils down to this stacktrace part:
> 
> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>  java.base/java.lang.Thread.run(Thread.java:834)\\n'
> gateway_client = <py4j.java_gateway.GatewayClient object at 0x140c43550>
> target_id = None
> name = 
> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
> 
>     def get_return_value(answer, gateway_client, target_id=None, name=None):
>         """Converts an answer received from the Java gateway into a Python 
> object.
>     
>         For example, string representation of integers are converted to Python
>         integer, string representation of objects are converted to JavaObject
>         instances, etc.
>     
>         :param answer: the string returned by the Java gateway
>         :param gateway_client: the gateway client used to communicate with 
> the Java
>             Gateway. Only necessary if the answer is a reference (e.g., 
> object,
>             list, map)
>         :param target_id: the name of the object from which the answer comes 
> from
>             (e.g., *object1* in `object1.hello()`). Optional.
>         :param name: the name of the member from which the answer comes from
>             (e.g., *hello* in `object1.hello()`). Optional.
>         """
>         if is_error(answer)[0]:
>             if len(answer) > 1:
>                 type = answer[1]
>                 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
>                 if answer[1] == REFERENCE_TYPE:
>                     raise Py4JJavaError(
>                         "An error occurred while calling {0}{1}{2}.\n".
>                         format(target_id, ".", name), value)
>                 else:
> >                   raise Py4JError(
>                         "An error occurred while calling {0}{1}{2}. 
> Trace:\n{3}\n".
>                         format(target_id, ".", name, value))
> E                   py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
>  Trace:
> E                   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist
> E                     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
> E                     at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> E                     at java.base/java.lang.Thread.run(Thread.java:834)
> 
> I hope you can help me out!
> 
> Thanks in advance,
> Wouter

Reply via email to