Hello,

What is the correct way to use Python dict's as ROW type in pyflink? Im
trying this:

output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
                                     [Types.STRING(), Types.STRING(),
Types.LONG() ])

class MyProcessFunction(KeyedProcessFunction):
    def process_element(self, value, ctx:
'KeyedProcessFunction.Context', out: Collector):
        result = {"id": ctx.get_current_key()[0], "data":
"some_string", "timestamp": 111111111111}
        out.collect(result)
        current_watermark = ctx.timer_service().current_watermark()
        ctx.timer_service().register_event_time_timer(current_watermark + 1500)

    def on_timer(self, timestamp, ctx:
'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
        logging.info(timestamp)
        out.collect("On timer timestamp: " + str(timestamp))

ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(),
Types.STRING()])) \
   .process(MyProcessFunction(), output_type=output_type_info)


I just hardcoded the values in MyProcessFunction to be sure that the input
data doesnt mess the fields. So the data is correct but PyFlink trews an
exception:

at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
> at
> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
> at
> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
> at
> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
> ... 10 more

However it works with primitive types like Types.STRING(). According
to the documentation the ROW type corresponds to the python's dict
type.


Regards

Reply via email to