Hi Wouter, 1) For the exception, it seems a bug. I have filed a ticket for it: https://issues.apache.org/jira/browse/FLINK-22733 <https://issues.apache.org/jira/browse/FLINK-22733>
2) Regarding to your requirements, I guess you should do it as following: ``` init_stream = (operator_stream .filter(lambda r: r[0] is None) .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) ) stateful_operator_stream = (operator_stream .filter(lambda r: r[0] is not None) .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])) ) init_stream.union(stateful_operator_stream).key_by(lambda x: x[0],Types.STRING()) ``` The reason is that `union` will turns `KeyedStream` into `DataStream` and you could not perform stateful operations on `DataStream` any more. Regards, Dian > 2021年5月21日 上午12:38,Wouter Zorgdrager <zorgdrag...@gmail.com> 写道: > > Dear all, > > I'm having trouble unifying two data streams using the union operator in > PyFlink. My code basically looks like this: > > init_stream = (operator_stream > .filter(lambda r: r[0] is None) > > .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) > .key_by(lambda x: x[0], Types.STRING()) > ) > > stateful_operator_stream = (operator_stream > .filter(lambda r: r[0] is not None) > .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), > Types.PICKLED_BYTE_ARRAY()])) > .key_by(lambda x: x[0],Types.STRING()) > ) > > print(init_stream) > print(init_stream.get_type()) > > print(stateful_operator_stream.get_type()) > print(stateful_operator_stream) > > final_operator_stream = init_stream > .union(stateful_operator_stream) > .process(stateful_operator) > > > In short, I have a datastream (operator_stream) of type Tuple[str, Event] > which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY(). > When calling the union operator, I get an error which shows a type mismatch > between both streams: > > py4j.protocol.Py4JJavaError: An error occurred while calling o732.union. > : java.lang.IllegalArgumentException: Cannot union streams of different > types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: > Java Tuple2<String, PickledByteArrayTypeInfo>) > at > org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:834) > > However, when I print the types of both datastreams they seem similar: > > <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320> > RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) > > RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) > <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> > > Any thoughts? Thanks in advance! > > Regards, > Wouter