Dear all,

I'm having trouble unifying two data streams using the union operator in
PyFlink. My code basically looks like this:

init_stream = (operator_stream
   .filter(lambda r: r[0] is None)
   .map(init_operator,Types.TUPLE([Types.STRING(),Types.PICKLED_BYTE_ARRAY()]))
   .key_by(lambda x: x[0], Types.STRING())
)

stateful_operator_stream = (operator_stream
    .filter(lambda r: r[0] is not None)
    .map(lambda x: (x[0], x[1]),Types.TUPLE([Types.STRING(),
Types.PICKLED_BYTE_ARRAY()]))
    .key_by(lambda x: x[0],Types.STRING())
)
print(init_stream)print(init_stream.get_type())
print(stateful_operator_stream.get_type())print(stateful_operator_stream)

final_operator_stream = init_stream
    .union(stateful_operator_stream)
    .process(stateful_operator)


In short, I have a datastream (operator_stream) of type Tuple[str,
Event] which I define as a tuple of Types.STRING() and
Types.PICKLED.BYTE_ARRAY().
When calling the union operator, I get an error which shows a type
mismatch between both streams:

py4j.protocol.Py4JJavaError: An error occurred while calling o732.union.
: java.lang.IllegalArgumentException: Cannot union streams of
different types: Java Tuple2<String, PickledByteArrayTypeInfo> and
Row(f0: String, f1: Java Tuple2<String, PickledByteArrayTypeInfo>)
        at 
org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:238)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at 
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at 
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at 
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at 
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at 
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
However, when I print the types of both datastreams they seem similar:

<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539877320>
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
RowTypeInfo(f0: String, f1: TupleTypeInfo(String, PickledByteArrayTypeInfo))
<pyflink.datastream.data_stream.KeyedStream object at 0x7f1539884f98>
Any thoughts? Thanks in advance!

Regards,
Wouter

Reply via email to