Re: PyFlink DataStream union type mismatch

2021-05-23 Thread Dian Fu
Hi Wouter,

1. These configurations work for both Python Table API and DataStream API. It 
seems targeted for Python Table API as only Python Table API is supported when 
adding the documentation. I will followup to improve the paragraph.

2. The recommended way is using `stream_execution_environment.add_jars`. Could 
you share some code snippets? 

Regards,
Dian

> 2021年5月23日 下午7:26,Wouter Zorgdrager  写道:
> 
> Hi Dian, all,
> 
> Thanks, that indeed solved my problem. I have two more questions, I'm not 
> sure if it is better practice to send a new email to the mailing list or to 
> re-use a thread:
> 
> 1. I noticed very high latency (multiple seconds per message) for a job with 
> multiple operators and very low throughput. I bet because messages are 
> bundled until a size threshold or time threshold is met (and in a low 
> throughput scenario, only the time threshold is triggered).
> This is also the idea I get when reading the configuration page [1]. However, 
> these configuration values seem to be targeted at the TableAPI and it is 
> unclear to me how to configure this for the Datastream API. To be clear, this 
> is in PyFlink.
> 
> 2. I'm using the JVM Kafka Consumer and Producer for my Python job. Therefore 
> I had to add the flink-connector-sql-kafka jar to my Flink environment. I did 
> this by downloading the jar file from Maven and putting it under 
> 'venv/pyflink/lib'. Is there any easier way? I'm not particularly a fan of 
> manually changing my venv. 
> I tried to use stream_execution_environment.add_jars but that was 
> unsuccessful, I still got a ClassNotFoundException.  
> 
> Hope you can help. As always, thanks a lot!
> 
> Regards,
> Wouter
> 
> 
> [1] - 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/
>  
> 
> On Fri, 21 May 2021 at 05:25, Dian Fu  > wrote:
> Hi Wouter,
> 
> 1) For the exception, it seems a bug. I have filed a ticket for it: 
> https://issues.apache.org/jira/browse/FLINK-22733 
> 
> 
> 2) Regarding to your requirements, I guess you should do it as following:
> ```
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
> )
> 
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> )
> 
> 
> init_stream.union(stateful_operator_stream).key_by(lambda x: 
> x[0],Types.STRING())
> 
> ```
> 
> The reason is that `union` will turns `KeyedStream` into `DataStream` and you 
> could not perform stateful operations on `DataStream` any more.
> 
> Regards,
> Dian
> 
>> 2021年5月21日 上午12:38,Wouter Zorgdrager > > 写道:
>> 
>> Dear all,
>> 
>> I'm having trouble unifying two data streams using the union operator in 
>> PyFlink. My code basically looks like this:
>> 
>> init_stream = (operator_stream
>>.filter(lambda r: r[0] is None)
>>
>> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>>.key_by(lambda x: x[0], Types.STRING())
>> )
>> 
>> stateful_operator_stream = (operator_stream
>> .filter(lambda r: r[0] is not None)
>> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
>> Types.PICKLED_BYTE_ARRAY()]))
>> .key_by(lambda x: x[0],Types.STRING())
>> )
>> 
>> print(init_stream)
>> print(init_stream.get_type())
>> 
>> print(stateful_operator_stream.get_type())
>> print(stateful_operator_stream)
>> 
>> final_operator_stream = init_stream
>> .union(stateful_operator_stream)
>> .process(stateful_operator)
>> 
>> 
>> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
>> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
>> When calling the union operator, I get an error which shows a type mismatch 
>> between both streams:
>> 
>> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
>> : java.lang.IllegalArgumentException: Cannot union streams of different 
>> types: Java Tuple2 and Row(f0: String, f1: 
>> Java Tuple2)
>>  at 
>> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>>  at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
>> Method)
>>  at 
>> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>>  at 
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>>  at 
>> 

Re: PyFlink DataStream union type mismatch

2021-05-23 Thread Wouter Zorgdrager
Hi Dian, all,

Thanks, that indeed solved my problem. I have two more questions, I'm not
sure if it is better practice to send a new email to the mailing list or to
re-use a thread:

1. I noticed very high latency (multiple seconds per message) for a job
with multiple operators and very low throughput. I bet because messages are
bundled until a size threshold or time threshold is met (and in a low
throughput scenario, only the time threshold is triggered).
This is also the idea I get when reading the configuration page [1].
However, these configuration values seem to be targeted at the TableAPI and
it is unclear to me how to configure this for the Datastream API. To be
clear, this is in PyFlink.

2. I'm using the JVM Kafka Consumer and Producer for my Python job.
Therefore I had to add the flink-connector-sql-kafka jar to my Flink
environment. I did this by downloading the jar file from Maven and putting
it under 'venv/pyflink/lib'. Is there any easier way? I'm not particularly
a fan of manually changing my venv.
I tried to use stream_execution_environment.add_jars but that was
unsuccessful, I still got a ClassNotFoundException.

Hope you can help. As always, thanks a lot!

Regards,
Wouter


[1] -
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/python_config/

On Fri, 21 May 2021 at 05:25, Dian Fu  wrote:

> Hi Wouter,
>
> 1) For the exception, it seems a bug. I have filed a ticket for it:
> https://issues.apache.org/jira/browse/FLINK-22733
>
> 2) Regarding to your requirements, I guess you should do it as following:
> ```
>
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
> )
>
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> )
>
>
>
> init_stream.union(stateful_operator_stream).key_by(lambda x: x[0
> ],Types.STRING())
>
> ```
>
> The reason is that `union` will turns `KeyedStream` into `DataStream` and
> you could not perform stateful operations on `DataStream` any more.
>
> Regards,
> Dian
>
> 2021年5月21日 上午12:38,Wouter Zorgdrager  写道:
>
> Dear all,
>
> I'm having trouble unifying two data streams using the union operator in
> PyFlink. My code basically looks like this:
>
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>.key_by(lambda x: x[0], Types.STRING())
> )
>
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> .key_by(lambda x: x[0],Types.STRING())
> )
> print(init_stream)print(init_stream.get_type())
> print(stateful_operator_stream.get_type())print(stateful_operator_stream)
>
> final_operator_stream = init_stream
> .union(stateful_operator_stream)
> .process(stateful_operator)
>
>
> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
> When calling the union operator, I get an error which shows a type mismatch 
> between both streams:
>
> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
> : java.lang.IllegalArgumentException: Cannot union streams of different 
> types: Java Tuple2 and Row(f0: String, f1: 
> Java Tuple2)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> However, when I print the types of both datastreams they seem similar:
>
> 
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> 
> Any thoughts? Thanks in advance!
>
> 

Re: PyFlink DataStream union type mismatch

2021-05-20 Thread Dian Fu
Hi Wouter,

1) For the exception, it seems a bug. I have filed a ticket for it: 
https://issues.apache.org/jira/browse/FLINK-22733 


2) Regarding to your requirements, I guess you should do it as following:
```
init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
)

stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
Types.PICKLED_BYTE_ARRAY()]))
)


init_stream.union(stateful_operator_stream).key_by(lambda x: 
x[0],Types.STRING())

```

The reason is that `union` will turns `KeyedStream` into `DataStream` and you 
could not perform stateful operations on `DataStream` any more.

Regards,
Dian

> 2021年5月21日 上午12:38,Wouter Zorgdrager  写道:
> 
> Dear all,
> 
> I'm having trouble unifying two data streams using the union operator in 
> PyFlink. My code basically looks like this:
> 
> init_stream = (operator_stream
>.filter(lambda r: r[0] is None)
>
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>.key_by(lambda x: x[0], Types.STRING())
> )
> 
> stateful_operator_stream = (operator_stream
> .filter(lambda r: r[0] is not None)
> .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
> Types.PICKLED_BYTE_ARRAY()]))
> .key_by(lambda x: x[0],Types.STRING())
> )
> 
> print(init_stream)
> print(init_stream.get_type())
> 
> print(stateful_operator_stream.get_type())
> print(stateful_operator_stream)
> 
> final_operator_stream = init_stream
> .union(stateful_operator_stream)
> .process(stateful_operator)
> 
> 
> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
> When calling the union operator, I get an error which shows a type mismatch 
> between both streams:
> 
> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
> : java.lang.IllegalArgumentException: Cannot union streams of different 
> types: Java Tuple2 and Row(f0: String, f1: 
> Java Tuple2)
>   at 
> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>   at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>   at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>   at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>   at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> 
> However, when I print the types of both datastreams they seem similar:
> 
> 
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> 
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> 
> 
> Any thoughts? Thanks in advance!
> 
> Regards,
> Wouter



PyFlink DataStream union type mismatch

2021-05-20 Thread Wouter Zorgdrager
Dear all,

I'm having trouble unifying two data streams using the union operator in
PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
.filter(lambda r: r[0] is not None)
.map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
Types.PICKLED_BYTE_ARRAY()]))
.key_by(lambda x: x[0],Types.STRING())
)
print(init_stream)print(init_stream.get_type())
print(stateful_operator_stream.get_type())print(stateful_operator_stream)

final_operator_stream = init_stream
.union(stateful_operator_stream)
.process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str,
Event] which I define as a tuple of Types.STRING() and
Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type
mismatch between both streams:

py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of
different types: Java Tuple2 and
Row(f0: String, f1: Java Tuple2)
at 
org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
However, when I print the types of both datastreams they seem similar:


RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))

Any thoughts? Thanks in advance!

Regards,
Wouter