Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Dian Fu
Hi Wouter,

Great to hear and thanks for the sharing!

Regards,
Dian

> 2021年6月8日 下午4:44,Wouter Zorgdrager  写道:
> 
> Hi Dian, all,
> 
> The way I resolved right now, is to write my own custom serializer which only 
> maps from bytes to bytes. See the code below:
> public class KafkaBytesSerializer implements SerializationSchema, 
> DeserializationSchema {
> 
> @Override
> public byte[] deserialize(byte[] bytes) throws IOException {
> return bytes;
> }
> 
> @Override
> public boolean isEndOfStream(byte[] bytes) {
> return false;
> }
> 
> @Override
> public byte[] serialize(byte[] bytes) {
> return bytes;
> }
> 
> @Override
> public TypeInformation getProducedType() {
> return TypeInformation.of(byte[].class);
> }
> }
> 
> This code is packaged in a jar and uploaded through env.add_jars. That works 
> like a charm! 
> 
> Thanks for the help!
> Wouter
> 
> On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager  > wrote:
> Hi Dian, all,
> 
> Thanks for your suggestion. Unfortunately, it does not seem to work. I get 
> the following exception:
> 
> Caused by: java.lang.NegativeArraySizeException: -2147183315
> at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at 
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
> at 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
> at 
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> 
> To be more precise, the messages in my Kafka topic are pickled Python 
> objects. Maybe that is the reason for the exception, I also tried using 
> Types.PICKLED_BYTE_ARRAY().get_java_type_info() but I think that has the same 
> serializer because I get the same exception.
> 
> Any suggestions? Thanks for your help!
> 
> Regards,
> Wouter
> 
> On Fri, 4 Jun 2021 at 08:24, Dian Fu  > wrote:
> Hi Wouter,
> 
>> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
>> Constructor 
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
>> org.apache.flink.configuration.Configuration]) does not exist
> 
> As the exception indicate, the constructor doesn’t exists.
> 
> 
> 
> Could you try with the following:
> 
> ```
> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
> j_type_serializer= 
> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>  j_type_serializer)
> ```
> 
> Regards,
> Dian
> 
>> 2021年6月3日 下午8:51,Wouter Zorgdrager > > 写道:
>> 
>> Hi all,
>> 
>> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to 
>> directly work with the bytes from and to Kafka because I want to 
>> serialize/deserialize in my Python code rather than the JVM environment. 
>> Therefore, I can't use the SimpleStringSchema for (de)serialization (the 
>> messages aren't strings anyways). I've tried to create a 
>> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>> 
>> class ByteSerializer(SerializationSchema, DeserializationSchema):
>> def __init__(self, execution_environment):
>> gate_way = get_gateway()
>> 
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>> Types.BYTE().get_java_type_info(),
>> get_j_env_configuration(execution_environment),
>> )
>> SerializationSchema.__init__(self, 
>> j_serialization_schema=j_byte_string_schema)
>> DeserializationSchema.__init__(
>> self, j_deserialization_schema=j_byte_string_schema
>> )
>> The ByteSerializer is used 

Re: ByteSerializationSchema in PyFlink

2021-06-08 Thread Wouter Zorgdrager
Hi Dian, all,

The way I resolved right now, is to write my own custom serializer which
only maps from bytes to bytes. See the code below:
public class KafkaBytesSerializer implements SerializationSchema,
DeserializationSchema {

@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}

@Override
public boolean isEndOfStream(byte[] bytes) {
return false;
}

@Override
public byte[] serialize(byte[] bytes) {
return bytes;
}

@Override
public TypeInformation getProducedType() {
return TypeInformation.of(byte[].class);
}
}

This code is packaged in a jar and uploaded through env.add_jars. That
works like a charm!

Thanks for the help!
Wouter

On Fri, 4 Jun 2021 at 14:40, Wouter Zorgdrager 
wrote:

> Hi Dian, all,
>
> Thanks for your suggestion. Unfortunately, it does not seem to work. I get
> the following exception:
>
> Caused by: java.lang.NegativeArraySizeException: -2147183315
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
> at
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
>
> To be more precise, the messages in my Kafka topic are pickled Python
> objects. Maybe that is the reason for the exception, I also tried using 
> Types.PICKLED_BYTE_ARRAY().get_java_type_info()
> but I think that has the same serializer because I get the same exception.
>
> Any suggestions? Thanks for your help!
>
> Regards,
> Wouter
>
> On Fri, 4 Jun 2021 at 08:24, Dian Fu  wrote:
>
>> Hi Wouter,
>>
>> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
>> Constructor 
>> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
>> org.apache.flink.configuration.Configuration]) does not exist
>>
>>
>> As the exception indicate, the constructor doesn’t exists.
>>
>>
>>
>> Could you try with the following:
>>
>> ```
>> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
>> j_type_serializer=
>>  
>> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>>
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>>  j_type_serializer)
>>
>> ```
>>
>> Regards,
>> Dian
>>
>> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
>>
>> Hi all,
>>
>> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
>> directly work with the bytes from and to Kafka because I want to
>> serialize/deserialize in my Python code rather than the JVM environment.
>> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
>> messages aren't strings anyways). I've tried to create a
>> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>>
>> class ByteSerializer(SerializationSchema, DeserializationSchema):
>> def __init__(self, execution_environment):
>> gate_way = get_gateway()
>>
>> j_byte_string_schema = 
>> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
>> Types.BYTE().get_java_type_info(),
>> get_j_env_configuration(execution_environment),
>> )
>> SerializationSchema.__init__(self, 
>> j_serialization_schema=j_byte_string_schema)
>> DeserializationSchema.__init__(
>> self, j_deserialization_schema=j_byte_string_schema
>> )The ByteSerializer is used like this:
>>
>>
>> return FlinkKafkaConsumer(
>> ["client_request", "internal"],
>> ByteSerializer(self.env._j_stream_execution_environment),
>> {
>> "bootstrap.servers": "localhost:9092",
>> "auto.offset.reset": "latest",

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Wouter Zorgdrager
Hi Dian, all,

Thanks for your suggestion. Unfortunately, it does not seem to work. I get
the following exception:

Caused by: java.lang.NegativeArraySizeException: -2147183315
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
at
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:31)
at
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.deserialize(TypeInformationSerializationSchema.java:92)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:58)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

To be more precise, the messages in my Kafka topic are pickled Python
objects. Maybe that is the reason for the exception, I also tried
using Types.PICKLED_BYTE_ARRAY().get_java_type_info()
but I think that has the same serializer because I get the same exception.

Any suggestions? Thanks for your help!

Regards,
Wouter

On Fri, 4 Jun 2021 at 08:24, Dian Fu  wrote:

> Hi Wouter,
>
> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist
>
>
> As the exception indicate, the constructor doesn’t exists.
>
>
>
> Could you try with the following:
>
> ```
> j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
> j_type_serializer=
>  
> j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
>
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
>  j_type_serializer)
>
> ```
>
> Regards,
> Dian
>
> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
>
> Hi all,
>
> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
> directly work with the bytes from and to Kafka because I want to
> serialize/deserialize in my Python code rather than the JVM environment.
> Therefore, I can't use the SimpleStringSchema for (de)serialization (the
> messages aren't strings anyways). I've tried to create a
> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
>
> class ByteSerializer(SerializationSchema, DeserializationSchema):
> def __init__(self, execution_environment):
> gate_way = get_gateway()
>
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
> Types.BYTE().get_java_type_info(),
> get_j_env_configuration(execution_environment),
> )
> SerializationSchema.__init__(self, 
> j_serialization_schema=j_byte_string_schema)
> DeserializationSchema.__init__(
> self, j_deserialization_schema=j_byte_string_schema
> )The ByteSerializer is used like this:
>
>
> return FlinkKafkaConsumer(
> ["client_request", "internal"],
> ByteSerializer(self.env._j_stream_execution_environment),
> {
> "bootstrap.servers": "localhost:9092",
> "auto.offset.reset": "latest",
> "group.id": str(uuid.uuid4()),
> },
> )
> However, this does not seem to work. I think the error is thrown in the JVM 
> environment, which makes it a bit hard to parse in my Python stack trace,
>
> but I think it boils down to this stacktrace part:
>
>
> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>  java.base/java.lang.Thread.run(Thread.java:834)\\n'
> gateway_client = 
> target_id = None
> name = 
> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
>
> def get_return_value(answer, gateway_client, target_id=None, name=None):
> """Converts an answer received from the Java gateway into a Python 
> object.
>
> For example, string representation of integers are converted to Python
> 

Re: ByteSerializationSchema in PyFlink

2021-06-04 Thread Dian Fu
Hi Wouter,

> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist

As the exception indicate, the constructor doesn’t exists.



Could you try with the following:

```
j_type_info= Types.PRIMITIVE_ARRAY(Types.BYTE()).get_java_type_info()
j_type_serializer= 
j_type_info.createSerializer(gate_way.jvm.org.apache.flink.api.common.ExecutionConfig())
j_byte_string_schema = 
gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(j_type_info,
 j_type_serializer)
```

Regards,
Dian

> 2021年6月3日 下午8:51,Wouter Zorgdrager  写道:
> 
> Hi all,
> 
> I have a PyFlink job connected to a KafkaConsumer and Producer. I want to 
> directly work with the bytes from and to Kafka because I want to 
> serialize/deserialize in my Python code rather than the JVM environment. 
> Therefore, I can't use the SimpleStringSchema for (de)serialization (the 
> messages aren't strings anyways). I've tried to create a 
> TypeInformationSerializer with Types.BYTE(), see the code snippet below:
> 
> class ByteSerializer(SerializationSchema, DeserializationSchema):
> def __init__(self, execution_environment):
> gate_way = get_gateway()
> 
> j_byte_string_schema = 
> gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
> Types.BYTE().get_java_type_info(),
> get_j_env_configuration(execution_environment),
> )
> SerializationSchema.__init__(self, 
> j_serialization_schema=j_byte_string_schema)
> DeserializationSchema.__init__(
> self, j_deserialization_schema=j_byte_string_schema
> )
> The ByteSerializer is used like this:
> 
> return FlinkKafkaConsumer(
> ["client_request", "internal"],
> ByteSerializer(self.env._j_stream_execution_environment),
> {
> "bootstrap.servers": "localhost:9092",
> "auto.offset.reset": "latest",
> "group.id ": str(uuid.uuid4()),
> },
> )
> 
> However, this does not seem to work. I think the error is thrown in the JVM 
> environment, which makes it a bit hard to parse in my Python stack trace,
> but I think it boils down to this stacktrace part:
> 
> answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
>  java.base/java.lang.Thread.run(Thread.java:834)\\n'
> gateway_client = 
> target_id = None
> name = 
> 'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'
> 
> def get_return_value(answer, gateway_client, target_id=None, name=None):
> """Converts an answer received from the Java gateway into a Python 
> object.
> 
> For example, string representation of integers are converted to Python
> integer, string representation of objects are converted to JavaObject
> instances, etc.
> 
> :param answer: the string returned by the Java gateway
> :param gateway_client: the gateway client used to communicate with 
> the Java
> Gateway. Only necessary if the answer is a reference (e.g., 
> object,
> list, map)
> :param target_id: the name of the object from which the answer comes 
> from
> (e.g., *object1* in `object1.hello()`). Optional.
> :param name: the name of the member from which the answer comes from
> (e.g., *hello* in `object1.hello()`). Optional.
> """
> if is_error(answer)[0]:
> if len(answer) > 1:
> type = answer[1]
> value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
> if answer[1] == REFERENCE_TYPE:
> raise Py4JJavaError(
> "An error occurred while calling {0}{1}{2}.\n".
> format(target_id, ".", name), value)
> else:
> >   raise Py4JError(
> "An error occurred while calling {0}{1}{2}. 
> Trace:\n{3}\n".
> format(target_id, ".", name, value))
> E   py4j.protocol.Py4JError: An error occurred while calling 
> None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
>  Trace:
> E   org.apache.flink.api.python.shaded.py4j.Py4JException: 
> Constructor 
> org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
>  org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class 
> org.apache.flink.configuration.Configuration]) does not exist
> E at 
> 

ByteSerializationSchema in PyFlink

2021-06-03 Thread Wouter Zorgdrager
Hi all,

I have a PyFlink job connected to a KafkaConsumer and Producer. I want to
directly work with the bytes from and to Kafka because I want to
serialize/deserialize in my Python code rather than the JVM environment.
Therefore, I can't use the SimpleStringSchema for (de)serialization (the
messages aren't strings anyways). I've tried to create a
TypeInformationSerializer with Types.BYTE(), see the code snippet below:

class ByteSerializer(SerializationSchema, DeserializationSchema):
def __init__(self, execution_environment):
gate_way = get_gateway()

j_byte_string_schema =
gate_way.jvm.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema(
Types.BYTE().get_java_type_info(),
get_j_env_configuration(execution_environment),
)
SerializationSchema.__init__(self,
j_serialization_schema=j_byte_string_schema)
DeserializationSchema.__init__(
self, j_deserialization_schema=j_byte_string_schema
)The ByteSerializer is used like this:


return FlinkKafkaConsumer(
["client_request", "internal"],
ByteSerializer(self.env._j_stream_execution_environment),
{
"bootstrap.servers": "localhost:9092",
"auto.offset.reset": "latest",
"group.id": str(uuid.uuid4()),
},
)
However, this does not seem to work. I think the error is thrown in
the JVM environment, which makes it a bit hard to parse in my Python
stack trace,

but I think it boils down to this stacktrace part:


answer = 'xsorg.apache.flink.api.python.shaded.py4j.Py4JException:
Constructor 
org.apache.flink.api.common.serialization.TypeIn...haded.py4j.GatewayConnection.run(GatewayConnection.java:238)\\n\tat
java.base/java.lang.Thread.run(Thread.java:834)\\n'
gateway_client = 
target_id = None
name = 
'org.apache.flink.api.common.serialization.TypeInformationSerializationSchema'

def get_return_value(answer, gateway_client, target_id=None, name=None):
"""Converts an answer received from the Java gateway into a
Python object.

For example, string representation of integers are converted to Python
integer, string representation of objects are converted to JavaObject
instances, etc.

:param answer: the string returned by the Java gateway
:param gateway_client: the gateway client used to communicate
with the Java
Gateway. Only necessary if the answer is a reference (e.g., object,
list, map)
:param target_id: the name of the object from which the answer
comes from
(e.g., *object1* in `object1.hello()`). Optional.
:param name: the name of the member from which the answer comes from
(e.g., *hello* in `object1.hello()`). Optional.
"""
if is_error(answer)[0]:
if len(answer) > 1:
type = answer[1]
value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
if answer[1] == REFERENCE_TYPE:
raise Py4JJavaError(
"An error occurred while calling {0}{1}{2}.\n".
format(target_id, ".", name), value)
else:
>   raise Py4JError(
"An error occurred while calling {0}{1}{2}.
Trace:\n{3}\n".
format(target_id, ".", name, value))
E   py4j.protocol.Py4JError: An error occurred while
calling 
None.org.apache.flink.api.common.serialization.TypeInformationSerializationSchema.
Trace:
E
org.apache.flink.api.python.shaded.py4j.Py4JException: Constructor
org.apache.flink.api.common.serialization.TypeInformationSerializationSchema([class
org.apache.flink.api.common.typeinfo.IntegerTypeInfo, class
org.apache.flink.configuration.Configuration]) does not exist
E   at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:179)
E   at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getConstructor(ReflectionEngine.java:196)
E   at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:237)
E   at
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
E   at
org.apache.flink.api.python.shaded.py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
E   at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
E   at java.base/java.lang.Thread.run(Thread.java:834)
I hope you can help me out!


Thanks in advance,

Wouter