Hi meneldor, Xingbo,

Sorry for the late reply.

Thanks a lot for Xingbo’s clarification.

And according to the stacktrace of the exception, could you have a check
whether the result data match the specified return type? BTW, please share
your code if it’s ok, it will be of help to debug.

Best,
Shuiqiang




meneldor <menel...@gmail.com> 于2021年1月15日周五 下午4:59写道:

> I imported pyflink.common.types.Row and used it as Shuiqiang suggested but
> now Java throws a memory exception:
>
> Caused by: TimerException{java.lang.OutOfMemoryError: Java heap space}
>> ... 11 more
>> Caused by: java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.flink.table.runtime.util.SegmentsUtil.allocateReuseChars(SegmentsUtil.java:91)
>> at
>> org.apache.flink.table.runtime.util.StringUtf8Utils.decodeUTF8(StringUtf8Utils.java:127)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:90)
>> at
>> org.apache.flink.table.runtime.typeutils.serializers.python.StringSerializer.deserialize(StringSerializer.java:41)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>> at
>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>> at
>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator$$Lambda$670/579781231.onProcessingTime(Unknown
>> Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$844/2129217743.run(Unknown
>> Source)
>>
>
> Regards
>
> On Fri, Jan 15, 2021 at 4:00 AM Xingbo Huang <hxbks...@gmail.com> wrote:
>
>> Hi meneldor,
>>
>> I guess Shuiqiang is not using the pyflink 1.12.0 to develop the example.
>> The signature of the `process_element` method has been changed in the new
>> version[1]. In pyflink 1.12.0, you can use `collector`.collect to send out
>> your results.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-20647
>>
>> Best,
>> Xingbo
>>
>> meneldor <menel...@gmail.com> 于2021年1月15日周五 上午1:20写道:
>>
>>> Thank you for the answer Shuiqiang!
>>> Im using the last apache-flink version:
>>>
>>>> Requirement already up-to-date: apache-flink in
>>>> ./venv/lib/python3.7/site-packages (1.12.0)
>>>
>>> however the method signature is using a collector:
>>>
>>> [image: image.png]
>>>  Im using the *setup-pyflink-virtual-env.sh* shell script from the
>>> docs(which uses pip).
>>>
>>> Regards
>>>
>>> On Thu, Jan 14, 2021 at 6:47 PM Shuiqiang Chen <acqua....@gmail.com>
>>> wrote:
>>>
>>>> Hi meneldor,
>>>>
>>>> The main cause of the error is that there is a bug in
>>>> `ctx.timer_service().current_watermark()`. At the beginning the stream,
>>>> when the first record come into the KeyedProcessFunction.process_element()
>>>> , the current_watermark will be the Long.MIN_VALUE at Java side, while at
>>>> the Python side, it becomes LONG.MAX_VALUE which is 9223372036854775807.
>>>>
>>>> >>> ctx.timer_service().register_event_time_timer(current_watermark + 1500)
>>>>
>>>> Here, 9223372036854775807 + 1500 is 9223372036854777307 which will be
>>>> automatically converted to a long interger in python but will cause Long
>>>> value overflow in Java when deserializing the registered timer value. I
>>>> will craete a issue to fix the bug.
>>>>
>>>> Let’s return to your initial question, at PyFlink you could create a
>>>> Row Type data as bellow:
>>>>
>>>> >>> row_data = Row(id=‘my id’, data=’some data’, timestamp=1111)
>>>>
>>>> And I wonder which release version of flink the code snippet you
>>>> provided based on? The latest API for
>>>> KeyedProcessFunction.process_element() and KeyedProcessFunction.on_timer()
>>>> will not provid a `collector` to collect output data but use `yield` which
>>>> is a more pythonic approach.
>>>>
>>>> Please refer to the following code:
>>>>
>>>> def keyed_process_function_example():
>>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>>     env.set_parallelism(1)
>>>>     env.get_config().set_auto_watermark_interval(2000)
>>>>     env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>>     data_stream = env.from_collection([(1, 'hello', '1603708211000'),
>>>>                                        (2, 'hi', '1603708224000'),
>>>>                                        (3, 'hello', '1603708226000'),
>>>>                                        (4, 'hi', '1603708289000')],
>>>>                                       type_info=Types.ROW([Types.INT(), 
>>>> Types.STRING(), Types.STRING()]))
>>>>
>>>>     class MyTimestampAssigner(TimestampAssigner):
>>>>
>>>>         def extract_timestamp(self, value, record_timestamp) -> int:
>>>>             return int(value[2])
>>>>
>>>>     class MyProcessFunction(KeyedProcessFunction):
>>>>
>>>>         def process_element(self, value, ctx: 
>>>> 'KeyedProcessFunction.Context'):
>>>>             yield Row(id=ctx.get_current_key()[1], data='some_string', 
>>>> timestamp=11111111)
>>>>             # current_watermark = ctx.timer_service().current_watermark()
>>>>             ctx.timer_service().register_event_time_timer(ctx.timestamp() 
>>>> + 1500)
>>>>
>>>>         def on_timer(self, timestamp: int, ctx: 
>>>> 'KeyedProcessFunction.OnTimerContext'):
>>>>             yield Row(id=ctx.get_current_key()[1], data='current on timer 
>>>> timestamp: ' + str(timestamp),
>>>>                       timestamp=timestamp)
>>>>
>>>>     output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp'], 
>>>> [Types.STRING(), Types.STRING(), Types.INT()])
>>>>     watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
>>>>         .with_timestamp_assigner(MyTimestampAssigner())
>>>>     data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
>>>>         .key_by(lambda x: (x[0], x[1]), 
>>>> key_type_info=Types.TUPLE([Types.INT(), Types.STRING()])) \
>>>>         .process(MyProcessFunction(), output_type=output_type_info).print()
>>>>     env.execute('test keyed process function')
>>>>
>>>>
>>>> Best,
>>>> Shuiqiang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> meneldor <menel...@gmail.com> 于2021年1月14日周四 下午10:45写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> What is the correct way to use Python dict's as ROW type in pyflink?
>>>>> Im trying this:
>>>>>
>>>>> output_type_info = Types.ROW_NAMED(['id', 'data', 'timestamp' ],
>>>>>                                      [Types.STRING(), Types.STRING(), 
>>>>> Types.LONG() ])
>>>>>
>>>>> class MyProcessFunction(KeyedProcessFunction):
>>>>>     def process_element(self, value, ctx: 'KeyedProcessFunction.Context', 
>>>>> out: Collector):
>>>>>         result = {"id": ctx.get_current_key()[0], "data": "some_string", 
>>>>> "timestamp": 111111111111}
>>>>>         out.collect(result)
>>>>>         current_watermark = ctx.timer_service().current_watermark()
>>>>>         ctx.timer_service().register_event_time_timer(current_watermark + 
>>>>> 1500)
>>>>>
>>>>>     def on_timer(self, timestamp, ctx: 
>>>>> 'KeyedProcessFunction.OnTimerContext', out: 'Collector'):
>>>>>         logging.info(timestamp)
>>>>>         out.collect("On timer timestamp: " + str(timestamp))
>>>>>
>>>>> ds.key_by(MyKeySelector(), key_type_info=Types.TUPLE([Types.STRING(), 
>>>>> Types.STRING()])) \
>>>>>    .process(MyProcessFunction(), output_type=output_type_info)
>>>>>
>>>>>
>>>>> I just hardcoded the values in MyProcessFunction to be sure that the
>>>>> input data doesnt mess the fields. So the data is correct but PyFlink 
>>>>> trews
>>>>> an exception:
>>>>>
>>>>> at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:290)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.MaskUtils.readIntoMask(MaskUtils.java:73)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:202)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:213)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.deserialize(RowSerializer.java:58)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.python.PythonKeyedProcessOperator.emitResult(PythonKeyedProcessOperator.java:253)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.emitResults(AbstractPythonFunctionOperator.java:266)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:293)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
>>>>>> at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
>>>>>> ... 10 more
>>>>>
>>>>> However it works with primitive types like Types.STRING(). According to 
>>>>> the documentation the ROW type corresponds to the python's dict type.
>>>>>
>>>>>
>>>>> Regards
>>>>>
>>>>>

Reply via email to