Hi Pablo,

thanks for the motivating examples. I understand the motivation now, but one question comes to mind - we do not mind the case, when in-between the emitting PTransform and the comsuming PTransform is another (grouping) PTransform which _changes the key_? Via the change of key, two elements originally emitted with the same key, can change the key to two different ones, and then back to the same one, which would obviously violate any ordering defined on the original emitting transform. I'm aware, that the per-key definition describes the two PTransforms to be directly connected. I'm just asking, if we would not want to solve this for the more general case.

In the original design document for @RequiresTimeSortedInput [1], there was (not implemented) mention about "User supplied sorting criterion", which I believe is exactly what for instance Kafka offset offers, and what is actually described by the per-key delivery semantics. The key point is that each element is (anyhow) assigned a sequential ID, which then defines order as seen by a per-key authoritative _observer_ (for instance, the source emitting transform, in the case of Kafka that is the leader for a partition, and so on). If this per-key sequential ID is carried along the element, then the order can be reconstructed at any downstream stage.

I'm +1 to splitting the documentation and validation / implementation parts, that sounds good to me.

 Jan

[1] https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


On 9/27/21 11:56 PM, Pablo Estrada wrote:
Hello all,
thanks for your comments.

All runners that I've tested have these semantics for streaming. See the PR[1] with the cap matrix changes.

I agree it makes sense to add a pipeline check for this. I think I would like to receive comments / agreement on the definition and the changes to the documentation - and then follow up with the pipeline check. Is that reasonable for everyone?

I have added a section of motivating use cases to the document, Jan. Let me know if those make sense.

[1] https://github.com/apache/beam/pull/15378 <https://github.com/apache/beam/pull/15378>

Thanks
-P

On Sat, Sep 25, 2021 at 1:06 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    +1 to adding a Pipeline requirement for this, if business logic
    relies
    on a specific feature runner might/might not have, then Pipeline
    should
    be rejected on runners that do not support it. Do we have a list
    runners
    that have or lack this semantics? Just for clarification - sorry my
    ignorance, if this has been already described - do we have a
    description
    of the use-cases that drive this effort?

    Thanks,

      Jan

    On 9/24/21 10:58 PM, Robert Bradshaw wrote:
    > Thanks for writing this up. Rather than just documenting it,
    should we
    > have a way of asserting/requesting it (like time sorted inputs) such
    > that a pipeline author that needs to rely on this property can be
    > rejected on runners that don't provide it?
    >
    > On Fri, Sep 24, 2021 at 12:25 PM Kenneth Knowles
    <k...@apache.org <mailto:k...@apache.org>> wrote:
    >> Took a look. I definitely agree that something like this is
    useful, and well-motivated by the use cases you raise.
    >>
    >> Kenn
    >>
    >> On Thu, Sep 23, 2021 at 4:30 PM Pablo Estrada
    <pabl...@google.com <mailto:pabl...@google.com>> wrote:
    >>> Hi all,
    >>> I've been spending some time thinking about CDC use cases on
    Beam. One valuable piece to enable these use cases is to define
    how Beam deals with ordering of elements in streaming pipelines.
    >>> With that in mind, I wrote a document[1] that proposes a
    definition of the ordering semantics supported by most Beam
    runners, and a pull request [2] with ValidatesRunner tests and
    documentation updates.
    >>>
    >>> Would you please review these, add your comments and thoughts,
    and let me know if they make sense?
    >>>
    >>> Thanks!
    >>> -P.
    >>>
    >>> [1]
    
https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#
    
<https://docs.google.com/document/d/1_7WRJznXlOtWuVaHl_dpy8OZcx_M8BUmeWVA4G0-wEc/edit#>
    >>> [2] https://github.com/apache/beam/pull/15378
    <https://github.com/apache/beam/pull/15378>

Reply via email to