[ 
https://issues.apache.org/jira/browse/BEAM-2140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16070150#comment-16070150
 ] 

ASF GitHub Bot commented on BEAM-2140:
--------------------------------------

GitHub user aljoscha opened a pull request:

    https://github.com/apache/beam/pull/3480

    [BEAM-2140] Execute Splittable DoFn directly in Flink Runner

    Before, we were using ProcessFn. This was causing problems with the
    Flink Runner for two reasons:
    
    1. StatefulDoFnRunner is in the processing path, which means
    processing-time timers are being dropped when the watermark reaches +Inf
    
    2. When a pipeline shuts down (for example, when bounded sources shut
    down) Flink will drop any outstanding processing-time timers, meaning
    that that any remaining Restrictions will not be processed.
    
    The fix for 1. is to execute the splittable DoFn directly, thereby
    bypassing the late data/timer dropping logic.
    
    The fix for 2. builds on the fix for 1. and also introduces a "last
    resort" even-time timer that fires at +Inf and makes sure that any
    remaining restrictions are being exhausted.
    
    R: @jkff Not sure if we wan't to fix it like this or maybe adapt 
`ProcessFn` and remove `StatefulDoFnRunner` from the processing path.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/beam 
fix-flink-splittable-dofn-squashed

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3480
    
----
commit e30efe70bdafbbcc5bc1082f980867e58684c351
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2017-06-26T10:10:18Z

    [BEAM-2140] Execute Splittable DoFn directly in Flink Runner
    
    Before, we were using ProcessFn. This was causing problems with the
    Flink Runner for two reasons:
    
    1. StatefulDoFnRunner is in the processing path, which means
    processing-time timers are being dropped when the watermark reaches +Inf
    
    2. When a pipeline shuts down (for example, when bounded sources shut
    down) Flink will drop any outstanding processing-time timers, meaning
    that that any remaining Restrictions will not be processed.
    
    The fix for 1. is to execute the splittable DoFn directly, thereby
    bypassing the late data/timer dropping logic.
    
    The fix for 2. builds on the fix for 1. and also introduces a "last
    resort" even-time timer that fires at +Inf and makes sure that any
    remaining restrictions are being exhausted.

----


> Fix SplittableDoFn ValidatesRunner tests in FlinkRunner
> -------------------------------------------------------
>
>                 Key: BEAM-2140
>                 URL: https://issues.apache.org/jira/browse/BEAM-2140
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>
> As discovered as part of BEAM-1763, there is a failing SDF test. We disabled 
> the tests to unblock the open PR for BEAM-1763.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to