Hi Wouter,

1) For the exception, it seems a bug. I have filed a ticket for it: 

2) Regarding to your requirements, I guess you should do it as following:
init_stream = (operator_stream
   .filter(lambda r: r[0] is None)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 

init_stream.union(stateful_operator_stream).key_by(lambda x: 


The reason is that `union` will turns `KeyedStream` into `DataStream` and you 
could not perform stateful operations on `DataStream` any more.


> 2021年5月21日 上午12:38,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道:
> Dear all,
> I'm having trouble unifying two data streams using the union operator in 
> PyFlink. My code basically looks like this:
> init_stream = (operator_stream
>    .filter(lambda r: r[0] is None)
> .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
>    .key_by(lambda x: x[0], Types.STRING())
> )
> stateful_operator_stream = (operator_stream
>     .filter(lambda r: r[0] is not None)
>     .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), 
>     .key_by(lambda x: x[0],Types.STRING())
> )
> print(init_stream)
> print(init_stream.get_type())
> print(stateful_operator_stream.get_type())
> print(stateful_operator_stream)
> final_operator_stream = init_stream
>     .union(stateful_operator_stream)
>     .process(stateful_operator)
> In short, I have a datastream (operator_stream) of type Tuple[str, Event] 
> which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY().
> When calling the union operator, I get an error which shows a type mismatch 
> between both streams:
> py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
> : java.lang.IllegalArgumentException: Cannot union streams of different 
> types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: 
> Java Tuple2<String, PickledByteArrayTypeInfo>)
>       at 
> org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>       at 
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>       at 
> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>       at 
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>       at 
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>       at java.base/java.lang.Thread.run(Thread.java:834)
> However, when I print the types of both datastreams they seem similar:
> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
> <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98>
> Any thoughts? Thanks in advance!
> Regards,
> Wouter

Reply via email to