And as a quick summary a pipeline with @RequiresTimeSortedInput will:

 a) work well on streaming pipelines run on direct java and non-portable flink, will fail on every other streaming runner

 b) work well on batch non-portable flink, legacy spark and batch dataflow

 c) from what I can tell, the code path seems to be supported for batch portable flink as well (if portable batch flink runner uses mostly the same code path, which seems it should)

If I'm not mistaken, then the only actual issue seems to be with the new spark runner and then all remaining streaming runners (dataflow, portable flink, jet, samza). The streaming case can probably be solved easily in one shot (as mentioned in previous email).

Jan

On 2/7/20 8:20 PM, Jan Lukavský wrote:

Hi Kenn,

I think that this approach is not well maintainable and doesn't scale. Main reasons:

 a) modifying core has by definition some impact on runners, so modifying core would imply necessity to modify all runners

 b) having to implement core feature for all existing runners will make any modification to core prohibitively expensive

 c) even if we accept this, there can be runners that are outside of beam repo (or even closed source!)

Therefore I think, that the correct and scalable approach would be to split this into several pieces:

 1) define pipeline requirements (this is pretty much similar to how we currently scope @Category(ValidatesRunner.class) tests

 2) let pipeline infer it's requirements prior to being translated via runner

 3) runner can check the set of required features and their support and reject the pipeline if some feature is missing

This could even replace the annotations used in validates runner tests, because each runner would simply execute all tests it has enough features to run.

But as I mentioned - this is pretty much deep change. I don't know how to safely do this for current runners, but to actually implement the feature (it seems to be to me nearly equally complicated to fail pipeline in batch case and to actually implement the sorting). It would be super cool if anyone would be interested in implementing this in runners that don't currently support it. A side note - currently the annotation is not supported by all streaming runners due to missing guarantees for timers ordering (which can lead to data losss). I think I have found a solution to this, see [1], but I'd like to be 100% sure, before enabling the support (I'm not sure what is the impact of mis-ordered timers on output timestamps, and so on, and so forth).

Jan

[1] https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209

On 2/7/20 7:53 PM, Kenneth Knowles wrote:
I see. It is good to see that the pipeline will at least fail. However, the expect approach here is that the pipeline is rejected prior to execution. That is a primary reason for our annotation-driven API style; it allows much better "static" analysis by a runner, so we don't have to wait and fail late. Here is an example: https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940

Kenn

On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Kenn,

    that should not be the case. Care was taken to fail streaming
    pipeline which needs this ability and the runner doesn't support
    this [1]. It is true, however, that a batch pipeline will not
    fail, because there is no generic (runner agnostic) way of
    supporting this transform in batch case (which is why the
    annotation was needed). Failing batch pipelines in this case
    would mean runners have to understand this annotation, which is
    pretty much close to implementing this feature as a whole.

    This applies generally to any core functionality, it might take
    some time before runners fully support this. I don't know how to
    solve it, maybe add record to capability matrix? I can imagine a
    fully generic solution (runners might publish their capabilities
    and pipeline might be validated against these capabilities at
    pipeline build time), but that is obviously out of scope of the
    annotation.

    Jan

    [1]
    
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150

    On 2/7/20 1:01 AM, Kenneth Knowles wrote:
    There is a major problem with this merge: the runners that do
    not support it do not reject pipelines that need this feature.
    They will silently produce the wrong answer, causing data loss.

    Kenn

    On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        Hi,

        the PR was merged to master and a few follow-up issues, were
        created,
        mainly [1] and [2]. I didn't find any reference to
        SortedMapState in
        JIRA, is there any tracking issue for that that I can link
        to? I also
        added link to design document here [3].

        [1] https://issues.apache.org/jira/browse/BEAM-9256

        [2] https://issues.apache.org/jira/browse/BEAM-9257

        [3]
        https://cwiki.apache.org/confluence/display/BEAM/Design+Documents

        On 1/30/20 1:39 PM, Jan Lukavský wrote:
        > Hi,
        >
        > PR [1] (issue [2]) went though code review, and according
        to [3] seems
        > to me to be ready for merge. Current state of the
        implementation is
        > that it is supported only for direct runner, legacy flink
        runner
        > (batch and streaming) and legacy spark (batch). It could
        be supported
        > by all other (streaming) runners using StatefulDoFnRunner,
        provided
        > the runner can make guarantees about ordering of timer
        firings (which
        > is unfortunately the case only for legacy flink and direct
        runner, at
        > least for now - related issues are mentioned multiple
        times on other
        > threads). Implementation for other batch runners should be as
        > straightforward as adding sorting by event timestamp
        before stateful
        > dofn (in case where the runner doesn't sort already - e.g.
        Dataflow -
        > in which case the annotation can be simply ignored - hence
        support for
        > batch Dataflow seems to be a no-op).
        >
        > There has been some slight controversy about this feature,
        but current
        > feature proposing and implementing guidelines do not cover
        how to
        > resolve those, so I'm using this opportunity to let the
        community
        > know, that there is a plan to merge this feature, unless
        there is some
        > veto (please provide specific reasons for that in that
        case). The plan
        > is to merge this in the second part of next week, unless
        there is a veto.
        >
        > Thanks,
        >
        >  Jan
        >
        > [1] https://github.com/apache/beam/pull/8774
        >
        > [2] https://issues.apache.org/jira/browse/BEAM-8550
        >
        > [3] https://beam.apache.org/contribute/committer-guide/
        >

Reply via email to