[ 
https://issues.apache.org/jira/browse/FLINK-18235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688420#comment-17688420
 ] 

Piotr Nowojski commented on FLINK-18235:
----------------------------------------

Could you elaborate why would it be impossible? For example:

Processing a single input record is atomic, and un-interruptible. Let's also 
enumerate records (1..N). I think you could quite easily have the following 
protocol:
# Flink's Python Operator (implemented in Java) sends "record N" to the Python 
process/thread
# Python recevies input "record N", starts processing it, during this time it 
may emit any number of output records (0, 1, or many). (in Java code, that's 
basically a single call to 
{{org.apache.flink.streaming.runtime.io.StreamInputProcessor#processInput}})
# Once input "record N" is processed by the Python process/thread, it sends 
back to the Flink Python Operator "record N completed"

It doesn't matter what kind of operator is actually executed underneath, it 
doesn't matter how many records were emitted. During checkpoint, the Python 
operator would just have to wait for first "record X completed" message, and 
persist on state all of the records < X (To reprocess them in case of fail 
over). 

True it gets more complicated if you take into consider state cache on the 
Python side, or other "actions" that could cause records to be emitted (timers, 
all kind of notifications like {{finish()}}, {{endOfInput}}, ...), but I don't 
see a fundamental problem? In other words, you could reimplement unaligned 
checkpoints and overtaking in-flight records between Flink's Python Operator 
and the actual Python code running in the Python thread/process. 

Just to be clear, I'm not saying/urging anyone to implement anything for this 
ticket right now. I just stumbled upon it and it made me curious what's the 
underlying issue.


> Improve the checkpoint strategy for Python UDF execution
> --------------------------------------------------------
>
>                 Key: FLINK-18235
>                 URL: https://issues.apache.org/jira/browse/FLINK-18235
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Python
>            Reporter: Dian Fu
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, stale-assigned
>
> Currently, when a checkpoint is triggered for the Python operator, all the 
> data buffered will be flushed to the Python worker to be processed. This will 
> increase the overall checkpoint time in case there are a lot of elements 
> buffered and Python UDF is slow. We should improve the checkpoint strategy to 
> improve this. One way to implement this is to control the number of data 
> buffered in the pipeline between Java/Python processes, similar to what 
> [FLIP-183|https://cwiki.apache.org/confluence/display/FLINK/FLIP-183%3A+Dynamic+buffer+size+adjustment]
>  does to control the number of data buffered in the network. We can also let 
> users to config the checkpoint strategy if needed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to