Dear all, I'm having trouble unifying two data streams using the union operator in PyFlink. My code basically looks like this:
init_stream = (operator_stream .filter(lambda r: r[0] is None) .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0], Types.STRING()) ) stateful_operator_stream = (operator_stream .filter(lambda r: r[0] is not None) .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(), Types.PICKLED_BYTE_ARRAY()])) .key_by(lambda x: x[0],Types.STRING()) ) print(init_stream)print(init_stream.get_type()) print(stateful_operator_stream.get_type())print(stateful_operator_stream) final_operator_stream = init_stream .union(stateful_operator_stream) .process(stateful_operator) In short, I have a datastream (operator_stream) of type Tuple[str, Event] which I define as a tuple of Types.STRING() and Types.PICKLED.BYTE_ARRAY(). When calling the union operator, I get an error which shows a type mismatch between both streams: py4j.protocol.Py4JJavaError: An error occurred while calling o732.union. : java.lang.IllegalArgumentException: Cannot union streams of different types: Java Tuple2<String, PickledByteArrayTypeInfo> and Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>) at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) However, when I print the types of both datastreams they seem similar: <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320> RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo)) <pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98> Any thoughts? Thanks in advance! Regards, Wouter