Hi Reza,

> if you are interested and have the bandwidth would be great to have you as a reviewer for the PR.

I'd be happy to.

Cheers,

 Jan

On 6/10/19 3:52 AM, Reza Rokni wrote:
Hi,

Interesting reading on the issue 143 :-) My example is more specific in its scope but the general pattern will have uses with most timeseries I suspect.

The specific Jira is:

https://issues.apache.org/jira/browse/BEAM-7386?filter=-2

The signature is currently of the form:
public static class BiTemporalJoin<K,V1,V2>
extends PTransform<KeyedPCollectionTuple<K>, PCollection<BiTemporalJoinResult<K,V1,V2>>> if you are interested and have the bandwidth would be great to have you as a reviewer for the PR. Also I hope to get time to contribute more around timeseries utilities and would be great to have collaborators! I have note looked into the detail of euphoria (looks interesting!) but it should be reasonably straightforward to make use of the class in other places.

Cheers

Reza


On Fri, 7 Jun 2019 at 14:50, Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi Reza, interesting suggestions, thanks.

    When you mentioned join, I recalled an older issue (which
    apparently was not yet transfered to Beam's JIRA)  [1]. Is this
    anyhow related to what you are implementing? Would you like to
    make your implementation accessible via Euphoria DSL [2]?

     Jan

    [1] https://github.com/seznam/euphoria/issues/143

    [2]
    
https://github.com/apache/beam/blob/master/sdks/java/extensions/euphoria/src/main/java/org/apache/beam/sdk/extensions/euphoria/core/client/operator/Join.java

    On 6/7/19 7:06 AM, Reza Rokni wrote:
    Hi Jan,

    I have been working on a timeseries extension which makes use of
    many of these techniques for joining two temporal streams, it's
    almost ready for the PR, will ping it here when it is as it might
    be useful for you. In general, I borrowed a lot of techniques
    from CoGroupBy code.

    /1) need to figure out how to get Coder of input PCollection
    of stateful ParDo inside StatefulDoFnRunner/
    My join takes in a <K, V1, V2> , in the outer transform I use
    things like leftCollection.getCoder()).getValueCoder(); Then when
    creating the Join transform I can defer the StateSpec object
    creation until the constructor is called.

    /2) there are performance considerations, that can be solved
    probably only by Sorted Map State [2]/
    Sorted Map is going to be awesome, until then the only option is
    to create a Cache in the DoFn to make it more efficient. For the
    cache to work you need to key on Window + key and do things like
    clear the cache @Startbundle. Better to wait for Sorted Map if
    this is not time critical.

    /3) additional work is needed for allowedLateness to work
    correctly (and there are at least two ways how to solve this),
    see the design doc [3]/
    Yup, in my case I can support this by not GC the right side of
    the join for now, but that is a compromise.

    /4) more tests (for batch and validatesRunner) are needed/
    I just posted a question on the best way to make use of
    the @ValidateRunner annotation on this list, sounds like it might
    be useful to you as well :-)

    On Thu, 6 Jun 2019 at 23:03, Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi,

        I have written a PoC implementation of this in [1] and I'd
        like to
        discuss some implementation details. First of all, I'd
        appreciate any
        feedback about this. There are some known issues:

          1) need to figure out how to get Coder of input PCollection of
        stateful ParDo inside StatefulDoFnRunner

          2) there are performance considerations, that can be solved
        probably
        only by Sorted Map State [2]

          3) additional work is needed for allowedLateness to work
        correctly
        (and there are at least two ways how to solve this), see the
        design doc [3]

          4) more tests (for batch and validatesRunner) are needed

        I have come across a few bugs in DirectRunner, which I tried
        to solve:

          a) timers seem to be broken in stateful pardo with side inputs

          b) timers need to be sorted by timestamp, otherwise state
        might be
        cleared before it gets chance to be flushed


        Thanks for feedback,

          Jan


        [1] https://github.com/apache/beam/pull/8774

        [2]
        
http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/%3ccalstk6+ldemtjmnuysn3vcufywjkhmgv1isfbdmxthoqh91...@mail.gmail.com%3e

        [3]
        
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/


        On 5/23/19 4:40 PM, Robert Bradshaw wrote:
        > Thanks for writing this up.
        >
        > I think the justification for adding this to the model
        needs to be
        > that it is useful (you have this covered, though some
        examples would
        > be nice) and that it's something that can't easily be done
        by users
        > themselves (specifically, though it can be (relatively)
        cheaply done
        > in streaming and batch, it's done in very different ways,
        and also
        > that it's hard to do via composition).
        >
        > On Thu, May 23, 2019 at 4:10 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:
        >> Hi,
        >>
        >> I have written a very brief draft of how it might be
        possible to
        >> implement @RequireTimeSortedInput discussed in [1]. I see
        the document
        >> [2] a starting point for a discussion. There are several
        open questions,
        >> which I believe can be resolved by this great community. :-)
        >>
        >> Jan
        >>
        >> [1]
        http://mail-archives.apache.org/mod_mbox/beam-dev/201905.mbox/browser
        >>
        >> [2]
        >>
        
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/
        >>



--
    This email may be confidential and privileged. If you received
    this communication by mistake, please don't forward it to anyone
    else, please erase all copies and attachments, and please let me
    know that it has gone to the wrong person.

    The above terms reflect a potential business arrangement, are
    provided solely as a basis for further discussion, and are not
    intended to be and do not constitute a legally binding
    obligation. No legally binding obligations will be created,
    implied, or inferred until an agreement in final form is executed
    in writing by all parties involved.



--

This email may be confidential and privileged. If you received this communication by mistake, please don't forward it to anyone else, please erase all copies and attachments, and please let me know that it has gone to the wrong person.

The above terms reflect a potential business arrangement, are provided solely as a basis for further discussion, and are not intended to be and do not constitute a legally binding obligation. No legally binding obligations will be created, implied, or inferred until an agreement in final form is executed in writing by all parties involved.

Reply via email to