Great idea for AddHints.of() !

What would be the resulting PCollection ? Just a PCollection of hints or the pc elements + hints ?

Regards
JB

On 30/01/2018 20:52, Reuven Lax wrote:
I think adding hints for runners is reasonable, though hints should always be assumed to be optional - they shouldn't change semantics of the program (otherwise you destroy the portability promise of Beam). However there are many types of hints that some runners might find useful (e.g. this step needs more memory. this step runs ML algorithms, and should run on a machine with GPUs. etc.)

Robert has mentioned in the past that we should try and keep PCollection an immutable object, and not introduce new setters on it. We slightly break this already today with PCollection.setCoder, and that has caused some problems. Hints can be set on PTransforms though, and propagate to that PTransform's output PCollections. This is nearly as easy to use however, as we can implement a helper PTransform that can be used to set hints. I.e.

pc.apply(AddHints.of(hint1, hint2, hint3))

Is no harder than called pc.addHint()

Reuven

On Tue, Jan 30, 2018 at 11:39 AM, Jean-Baptiste Onofré <[email protected] <mailto:[email protected]>> wrote:

    Maybe I should have started the discussion on the user mailing list:
    it would be great to have user feedback on this, even if I got your
    points.

    Sometime, I have the feeling that whatever we are proposing and
    discussing, it doesn't go anywhere. At some point, to attract more
    people, we have to get ideas from different perspective/standpoint.

    Thanks for the feedback anyway.

    Regards
    JB

    On 30/01/2018 20:27, Romain Manni-Bucau wrote:



        2018-01-30 19:52 GMT+01:00 Kenneth Knowles <[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>>:


             I generally like having certain "escape hatches" that are well
             designed and limited in scope, and anything that turns out
        to be
             important becomes first-class. But this one I don't really like
             because the use cases belong elsewhere. Of course, they
        creep so you
             should assume they will be unbounded in how much gets
        stuffed into
             them. And the definition of a "hint" is that deleting it
        does not
             change semantics, just performance/monitor/UI etc but this
        does not
             seem to be true.

             "spark.persist" for idempotent replay in a sink:
               - this is already @RequiresStableInput
               - it is not a hint because if you don't persist your
        results are
             incorrect
               - it is a property of a DoFn / transform not a PCollection


        Let's put this last point aside since we'll manage to make it
        working wherever we store it ;).


             schema:
               - should be first-class


        Except it doesn't make sense everywhere. It is exactly like
        saying "implement this" and 2 lines later "it doesn't do
        anything for you". If you think wider on schema you will want to
        do far more - like getting them from the previous step etc... -
        which makes it not an API thing. However, with some runner like
        spark, being able to specifiy it will enable to optimize the
        execution. There is a clear mismatch between a consistent and
        user friendly generic and portable API, and a runtime, runner
        specific, implementation.

        This is all fine as an issue for a portable API and why all EE
        API have a map to pass properties somewhere so I don't see why
        beam wouldn't fall in that exact same bucket since it embraces
        the drawback of the portability and we already hit it since
        several releases.


             step parallelism (you didn't mention but most runners need some
             control):
               - this is a property of the data and the pipeline
        together, not
             just the pipeline


        Good one but this can be configured from the pipeline or even a
        transform. This doesn't mean the data is not important - and you
        are more than right on that point, just that it is configurable
        without referencing the data (using ranges is a trivial example
        even if not the most efficient).


             So I just don't actually see a use case for free-form hints
        that we
             haven't already covered.


        There are several cases, even in the direct runner to be able to
        industrialize it:
        - use that particular executor instance
        - debug these infos for that transform

        etc...

        As a high level design I think it is good to bring hints to beam
        to avoid to add ad-hoc solution each time and take the risk to
        loose the portability of the main API.


             Kenn

             On Tue, Jan 30, 2018 at 9:55 AM, Romain Manni-Bucau
             <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>>
        wrote:

                 Lukasz, the point is that you have to choice to either
        bring all
                 specificities to the main API which makes most of the
        API not
                 usable or implemented or the opposite, not support
        anything.
                 Introducing hints will allow to have eagerly for some
        runners
                 some features - or just some very specific things - and
        once
                 mainstream it can find a place in the main API. This is
        saner
                 than the opposite since some specificities can never
        find a good
                 place.

                 The little thing we need to take care with that is to
        avoid to
                 introduce some feature flipping as support some feature not
                 doable with another runner. It should really be about
        runing a
                 runner execution (like the schema in spark).


                 Romain Manni-Bucau
                 @rmannibucau <https://twitter.com/rmannibucau
        <https://twitter.com/rmannibucau>> | Blog
                 <https://rmannibucau.metawerx.net/
        <https://rmannibucau.metawerx.net/>> | Old Blog
                 <http://rmannibucau.wordpress.com
        <http://rmannibucau.wordpress.com>> | Github
                 <https://github.com/rmannibucau
        <https://github.com/rmannibucau>> | LinkedIn
                 <https://www.linkedin.com/in/rmannibucau
        <https://www.linkedin.com/in/rmannibucau>>

                 2018-01-30 18:42 GMT+01:00 Jean-Baptiste Onofré
        <[email protected] <mailto:[email protected]>
                 <mailto:[email protected] <mailto:[email protected]>>>:

                     Good point Luke: in that case, the hint will be
        ignored by
                     the runner if the hint is not for him. The hint can be
                     generic (not specific to a runner). It could be
        interesting
                     for the schema support or IOs, not specific to a
        runner.

                     What do you mean by gathering
        PTransforms/PCollections and
                     where ?

                     Thanks !
                     Regards
                     JB

                     On 30/01/2018 18:35, Lukasz Cwik wrote:

                         If the hint is required to run the persons pipeline
                         well, how do you expect that the person we be
        able to
                         migrate their pipeline to another runner?

                         A lot of hints like "spark.persist" are really
        the user
                         trying to tell us something about the
        PCollection, like
                         it is very small. I would prefer if we gathered
        this
                         information about PTransforms and PCollections
        instead
                         of runner specific knobs since then each runner can
                         choose how best to map such a property on their
        internal
                         representation.

                         On Tue, Jan 30, 2018 at 2:21 AM, Jean-Baptiste
        Onofré
                         <[email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
                         <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>>> wrote:

                              Hi,

                              As part of the discussion about schema, Romain
                         mentioned hint. I
                              think it's
                              worth to have an explanation about that and
                         especially it could be
                              wider than
                              schema.

                              Today, to give information to the runner,
        we use
                         PipelineOptions.
                              The runner can
                              use these options, and apply for all inner
                         representation of the
                              PCollection in
                              the runner.

                              For instance, for the Spark runner, the
        persistence
                         storage level
                              (memory, disk,
                              ...) can be defined via pipeline options.

                              Then, the Spark runner automatically
        defines if
                         RDDs have to be
                              persisted (using
                              the storage level defined in the pipeline
        options),
                         for instance if
                              the same
                              POutput/PCollection is read several time.

                              However, the user doesn't have any way to
        provide
                         indication to the
                              runner to
                              deal with a specific PCollection.

                              Imagine, the user has a pipeline like this:
                              pipeline.apply().apply().apply(). We
                              have three PCollections involved in this
        pipeline.
                         It's not
                              currently possible
                              to give indications how the runner should
                         "optimized" and deal with
                              the second
                              PCollection only.

                              The idea is to add a method on the
        PCollection:

                              PCollection.addHint(String key, Object value);

                              For instance:

                              collection.addHint("spark.persist",
                         StorageLevel.MEMORY_ONLY);

                              I see three direct usage of this:

                              1. Related to schema: the schema
        definition could
                         be a hint
                              2. Related to the IO: add headers for the
        IO and
                         the runner how to
                              specifically
                              process a collection. In Apache Camel, we have
                         headers on the
                              message and
                              properties on the exchange similar to this. It
                         allows to give some
                              indication
                              how to process some messages on the Camel
                         component. We can imagine
                              the same of
                              the IO (using the PCollection hints to react
                         accordingly).
                              3. Related to runner optimization: I see for
                         instance a way to use
                              RDD or
                              dataframe in Spark runner, or even specific
                         optimization like
                              persist. I had lot
                              of questions from Spark users saying: "in
        my Spark
                         job, I know where
                              and how I
                              should use persist (rdd.persist()), but I
        can't do
                         such optimization
                              using
                              Beam". So it could be a good improvements.

                              Thoughts ?

                              Regards
                              JB
                              --
                              Jean-Baptiste Onofré
        [email protected] <mailto:[email protected]>
        <mailto:[email protected] <mailto:[email protected]>>
                         <mailto:[email protected]
        <mailto:[email protected]> <mailto:[email protected]
        <mailto:[email protected]>>>
        http://blog.nanthrax.net
                              Talend - http://www.talend.com






Reply via email to