On Wed, Jan 31, 2018 at 1:34 AM, Jean-Baptiste Onofré <[email protected]>
wrote:
> Hi Ismaël,
>
> I agree that hint should not change the output of PTransforms.
>
> However, let me illustrate why I think hint could be interesting:
>
> - I agree with what you are saying about the runners: they should be smart.
> However, to be smart enough, the runner could use some statements provided
> by
> pipeline designer. Let's take the Spark runner. In first version, we
> didn't do
> any caching of RDDs. Now, just for streaming, if a PCollection is read
> more than
> once, I added RDDs caching. That's systematic, and the user doesn't have
> the
> choice. When you write the same process using Spark directly, the user has
> control of the way the RDDs are cached, and it can depend of the use case.
> So, I
> think users will be happy to give some hint to the runner at some points
> of the
> pipeline.
>
> - Hint will open new features in the IOs. Let me take a concrete example.
> On a
> local branch, I created a RestIO. The RestIO Write PTransform call a
> remote REST
> service for each element in the PCollection. It's configured that way:
>
> pipeline.apply(...) // PCollection<String> where String is JSON for now
> .apply(RestIO.write().withUrl("http://localhost:8181/rest/"))
>
> So all elements will use the same URL. Now, imagine, the URL of the REST
> service
> depends of the element. There's no easy way to achieve that today: the
> only way
> is to change the IO to process a PCollection<RestRequest> where
> RestRequest POJO
> contains the message payload and the URL. That's OK to use such declarative
> approach, it's just a bit painful to change the POJO each time we need to
> deal
> an additional data (let's say encoding, authentication, etc).
>
However we already solved this particular use case for FileIO - the user
can configure the fileIO sink (TextIO, AvroIO, etc.) with a lambda that
maps the record to the destination to put the files. Can't we solve things
similarly for RestIO? It would look like.
pipeline.apply(...) // PCollection<String> where String is JSON for now
.apply(RestIO.write().withUrlFunction((String record) ->
{getUrl(record)})
> In Camel, you can attach a header to a message. So basically, you have the
> data
> payload (the body of the message) corresponding to the element.
>
> That's why I was thinking about hint on PCollection first: it would allow
> us to
> implement EIPs (Enterprise Integration Patterns).
>
> Regards
> JB
>
> On 01/31/2018 09:31 AM, Ismaël Mejía wrote:
> > This is a subject we have already discussed in the past. It was part
> > on the discussion on ‘data locality’ for the runners on top of HDFS.
> > In that moment the argument for ‘hints’ was that a transform could
> > send hints to the runners so they properly allocate the readers
> > improving its execution. This is similar to the case of resource
> > allocation (GPU) mentioned by Reuven.
> > https://issues.apache.org/jira/browse/BEAM-2085
> >
> > What is a bit tricky about the design is the optional characteristic
> > of hints, we say that hints should not change the semantics of the
> > transforms (its output), but they can easily be abused to configure
> > how runners behave. We should limit hints only to the use case of
> > resource allocation, cases where the runner can benefit of the hint
> > info to pass it to the resource allocator, but runner specific
> > configuration must be part only of the runner options, or runners
> > should be smarter.
> >
> > This is to avoid potential misuse for portability and to limit extra
> > knobs, Also to avoid the risky case of ending up with some sort of
> > runtime ‘map-like’ configuration with hundreds of options that change
> > behavior like they exist in Hadoop and Spark, We should avoid adding
> > another level of this kind of variables now on top of Beam.
> >
> > On Wed, Jan 31, 2018 at 7:25 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
> >> Hi,
> >>
> >> yeah, it sounds good to me. I will create the Jira to track this and
> start a PoC
> >> on the Composite.
> >>
> >> Thanks !
> >> Regards
> >> JB
> >>
> >> On 01/30/2018 10:40 PM, Reuven Lax wrote:
> >>> Did we actually reach consensus here? :)
> >>>
> >>> On Tue, Jan 30, 2018 at 1:29 PM, Romain Manni-Bucau <
> [email protected]
> >>> <mailto:[email protected]>> wrote:
> >>>
> >>> Not sure how it fits in terms of API yet but +1 for the high level
> view.
> >>> Makes perfect sense.
> >>>
> >>> Le 30 janv. 2018 21:41, "Jean-Baptiste Onofré" <[email protected]
> >>> <mailto:[email protected]>> a écrit :
> >>>
> >>> Hi Robert,
> >>>
> >>> Good point and idea for the Composite transform. It would
> apply nicely
> >>> on all transforms based on composite.
> >>>
> >>> I also agree that the hint is more on the transform than the
> PCollection
> >>> itself.
> >>>
> >>> Thanks !
> >>> Regards
> >>> JB
> >>>
> >>> On 30/01/2018 21:26, Robert Bradshaw wrote:
> >>>
> >>> Many hints make more sense for PTransforms (the
> computation itself)
> >>> than for PCollections. In addition, when we want
> properties attached
> >>> to PCollections of themselves, it often makes sense to let
> these be
> >>> provided by the producing PTransform (e.g. coders and
> schemas are
> >>> often functions of the input metadata and the operation
> itself, and
> >>> can't just be set arbitrarily).
> >>>
> >>> Also, we already have a perfectly standard way of nesting
> transforms
> >>> (or even sets of transforms), namely composite transforms.
> In terms of
> >>> API design I would propose writing a composite transform
> that applies
> >>> constraints/hints/requirements to all its inner
> transforms. This
> >>> translates nicely to the Fn API as well.
> >>>
> >>> On Tue, Jan 30, 2018 at 12:14 PM, Kenneth Knowles <
> [email protected]
> >>> <mailto:[email protected]>> wrote:
> >>>
> >>> It seems like most of these use cases are hints on a
> PTransform
> >>> and not a
> >>> PCollection, no? CPU, memory, expected parallelism,
> etc are.
> >>> Then you could
> >>> just have:
> >>> pc.apply(WithHints(myTransform, <hints>))
> >>>
> >>> For a PCollection hints that might make sense are bits
> like
> >>> total size,
> >>> element size, and throughput. All things that the
> Dataflow folks
> >>> have said
> >>> should be measured instead of hinted. But I understand
> that we
> >>> shouldn't
> >>> force runners to do infeasible things like build a
> whole
> >>> no-knobs service on
> >>> top of a super-knobby engine.
> >>>
> >>> Incidentally for portability, we have this
> "environment" object
> >>> that is
> >>> basically the docker URL of an SDK harness that can
> execute a
> >>> function. We
> >>> always intended that same area of the proto (exact
> fields TBD)
> >>> to have
> >>> things like requirements for CPU, memory, GPUs, disk,
> etc. It is
> >>> likely a
> >>> good place for hints.
> >>>
> >>> BTW good idea to ask users@ for their pain points and
> bring them
> >>> back to the
> >>> dev list to motivate feature design discussions.
> >>>
> >>> Kenn
> >>>
> >>> On Tue, Jan 30, 2018 at 12:00 PM, Reuven Lax <
> [email protected]
> >>> <mailto:[email protected]>> wrote:
> >>>
> >>>
> >>> I think the hints would logically be metadata in
> the
> >>> pcollection, just
> >>> like coder and schema.
> >>>
> >>> On Jan 30, 2018 11:57 AM, "Jean-Baptiste Onofré"
> >>> <[email protected] <mailto:[email protected]>> wrote:
> >>>
> >>>
> >>> Great idea for AddHints.of() !
> >>>
> >>> What would be the resulting PCollection ? Just
> a
> >>> PCollection of hints or
> >>> the pc elements + hints ?
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 30/01/2018 20:52, Reuven Lax wrote:
> >>>
> >>>
> >>> I think adding hints for runners is
> reasonable,
> >>> though hints should
> >>> always be assumed to be optional - they
> shouldn't
> >>> change semantics of the
> >>> program (otherwise you destroy the
> portability
> >>> promise of Beam). However
> >>> there are many types of hints that some
> runners
> >>> might find useful (e.g. this
> >>> step needs more memory. this step runs ML
> >>> algorithms, and should run on a
> >>> machine with GPUs. etc.)
> >>>
> >>> Robert has mentioned in the past that we
> should try
> >>> and keep PCollection
> >>> an immutable object, and not introduce new
> setters
> >>> on it. We slightly break
> >>> this already today with
> PCollection.setCoder, and
> >>> that has caused some
> >>> problems. Hints can be set on PTransforms
> though,
> >>> and propagate to that
> >>> PTransform's output PCollections. This is
> nearly as
> >>> easy to use however, as
> >>> we can implement a helper PTransform that
> can be
> >>> used to set hints. I.e.
> >>>
> >>> pc.apply(AddHints.of(hint1, hint2, hint3))
> >>>
> >>> Is no harder than called pc.addHint()
> >>>
> >>> Reuven
> >>>
> >>> On Tue, Jan 30, 2018 at 11:39 AM,
> Jean-Baptiste
> >>> Onofré <[email protected] <mailto:
> [email protected]>
> >>> <mailto:[email protected] <mailto:
> [email protected]>>>
> >>> wrote:
> >>>
> >>> Maybe I should have started the
> discussion on
> >>> the user mailing list:
> >>> it would be great to have user
> feedback on
> >>> this, even if I got your
> >>> points.
> >>>
> >>> Sometime, I have the feeling that
> whatever we
> >>> are proposing and
> >>> discussing, it doesn't go anywhere.
> At some
> >>> point, to attract more
> >>> people, we have to get ideas from
> different
> >>> perspective/standpoint.
> >>>
> >>> Thanks for the feedback anyway.
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 30/01/2018 20:27, Romain
> Manni-Bucau wrote:
> >>>
> >>>
> >>>
> >>> 2018-01-30 19:52 GMT+01:00
> Kenneth Knowles
> >>> <[email protected] <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>> <mailto:
> [email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>>>:
> >>>
> >>>
> >>> I generally like having
> certain
> >>> "escape hatches" that are
> >>> well
> >>> designed and limited in
> scope, and
> >>> anything that turns out
> >>> to be
> >>> important becomes
> first-class. But
> >>> this one I don't really
> >>> like
> >>> because the use cases belong
> >>> elsewhere. Of course, they
> >>> creep so you
> >>> should assume they will be
> unbounded
> >>> in how much gets
> >>> stuffed into
> >>> them. And the definition of
> a "hint"
> >>> is that deleting it
> >>> does not
> >>> change semantics, just
> >>> performance/monitor/UI etc but this
> >>> does not
> >>> seem to be true.
> >>>
> >>> "spark.persist" for
> idempotent replay
> >>> in a sink:
> >>> - this is already
> @RequiresStableInput
> >>> - it is not a hint because
> if you
> >>> don't persist your
> >>> results are
> >>> incorrect
> >>> - it is a property of a
> DoFn /
> >>> transform not a
> >>> PCollection
> >>>
> >>>
> >>> Let's put this last point aside
> since we'll
> >>> manage to make it
> >>> working wherever we store it ;).
> >>>
> >>>
> >>> schema:
> >>> - should be first-class
> >>>
> >>>
> >>> Except it doesn't make sense
> everywhere. It
> >>> is exactly like
> >>> saying "implement this" and 2
> lines later
> >>> "it doesn't do
> >>> anything for you". If you think
> wider on
> >>> schema you will want to
> >>> do far more - like getting them
> from the
> >>> previous step etc... -
> >>> which makes it not an API thing.
> However,
> >>> with some runner like
> >>> spark, being able to specifiy it
> will
> >>> enable to optimize the
> >>> execution. There is a clear
> mismatch
> >>> between a consistent and
> >>> user friendly generic and
> portable API, and
> >>> a runtime, runner
> >>> specific, implementation.
> >>>
> >>> This is all fine as an issue for
> a portable
> >>> API and why all EE
> >>> API have a map to pass properties
> somewhere
> >>> so I don't see why
> >>> beam wouldn't fall in that exact
> same
> >>> bucket since it embraces
> >>> the drawback of the portability
> and we
> >>> already hit it since
> >>> several releases.
> >>>
> >>>
> >>> step parallelism (you didn't
> mention
> >>> but most runners need
> >>> some
> >>> control):
> >>> - this is a property of
> the data and
> >>> the pipeline
> >>> together, not
> >>> just the pipeline
> >>>
> >>>
> >>> Good one but this can be
> configured from
> >>> the pipeline or even a
> >>> transform. This doesn't mean the
> data is
> >>> not important - and you
> >>> are more than right on that
> point, just
> >>> that it is configurable
> >>> without referencing the data
> (using ranges
> >>> is a trivial example
> >>> even if not the most efficient).
> >>>
> >>>
> >>> So I just don't actually see
> a use
> >>> case for free-form hints
> >>> that we
> >>> haven't already covered.
> >>>
> >>>
> >>> There are several cases, even in
> the direct
> >>> runner to be able to
> >>> industrialize it:
> >>> - use that particular executor
> instance
> >>> - debug these infos for that
> transform
> >>>
> >>> etc...
> >>>
> >>> As a high level design I think it
> is good
> >>> to bring hints to beam
> >>> to avoid to add ad-hoc solution
> each time
> >>> and take the risk to
> >>> loose the portability of the main
> API.
> >>>
> >>>
> >>> Kenn
> >>>
> >>> On Tue, Jan 30, 2018 at 9:55
> AM,
> >>> Romain Manni-Bucau
> >>> <[email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>>>
> >>> wrote:
> >>>
> >>> Lukasz, the point is
> that you have
> >>> to choice to either
> >>> bring all
> >>> specificities to the
> main API
> >>> which makes most of the
> >>> API not
> >>> usable or implemented or
> the
> >>> opposite, not support
> >>> anything.
> >>> Introducing hints will
> allow to
> >>> have eagerly for some
> >>> runners
> >>> some features - or just
> some very
> >>> specific things - and
> >>> once
> >>> mainstream it can find a
> place in
> >>> the main API. This is
> >>> saner
> >>> than the opposite since
> some
> >>> specificities can never
> >>> find a good
> >>> place.
> >>>
> >>> The little thing we need
> to take
> >>> care with that is to
> >>> avoid to
> >>> introduce some feature
> flipping as
> >>> support some feature
> >>> not
> >>> doable with another
> runner. It
> >>> should really be about
> >>> runing a
> >>> runner execution (like
> the schema
> >>> in spark).
> >>>
> >>>
> >>> Romain Manni-Bucau
> >>> @rmannibucau
> >>> <https://twitter.com/rmannibucau
> >>> <https://twitter.com/rmannibucau>
> >>> <https://twitter.com/rmannibucau
> >>> <https://twitter.com/rmannibucau>>> | Blog
> >>> <
> https://rmannibucau.metawerx.net/
> >>> <https://rmannibucau.metawerx.net/>
> >>> <https://rmannibucau.metawerx.
> net/
> >>> <https://rmannibucau.metawerx.net/>>> |
> Old Blog
> >>> <
> http://rmannibucau.wordpress.com
> >>> <http://rmannibucau.wordpress.com>
> >>> <http://rmannibucau.wordpress.com
> >>> <http://rmannibucau.wordpress.com>>> |
> Github
> >>> <https://github.com/
> rmannibucau
> >>> <https://github.com/rmannibucau>
> >>> <https://github.com/rmannibucau
> >>> <https://github.com/rmannibucau>>> |
> LinkedIn
> >>>
> >>> <https://www.linkedin.com/in/rmannibucau
> >>> <https://www.linkedin.com/in/rmannibucau>
> >>> <https://www.linkedin.com/in/
> rmannibucau
> >>> <https://www.linkedin.com/in/rmannibucau
> >>>
> >>>
> >>> 2018-01-30 18:42
> GMT+01:00
> >>> Jean-Baptiste Onofré
> >>> <[email protected] <mailto:
> [email protected]>
> >>> <mailto:[email protected] <mailto:
> [email protected]>>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]> <mailto:
> [email protected]
> >>> <mailto:[email protected]>>>>:
> >>>
> >>> Good point Luke: in
> that case,
> >>> the hint will be
> >>> ignored by
> >>> the runner if the
> hint is not
> >>> for him. The hint can
> >>> be
> >>> generic (not
> specific to a
> >>> runner). It could be
> >>> interesting
> >>> for the schema
> support or IOs,
> >>> not specific to a
> >>> runner.
> >>>
> >>> What do you mean by
> gathering
> >>> PTransforms/PCollections and
> >>> where ?
> >>>
> >>> Thanks !
> >>> Regards
> >>> JB
> >>>
> >>> On 30/01/2018 18:35,
> Lukasz
> >>> Cwik wrote:
> >>>
> >>> If the hint is
> required to
> >>> run the persons
> >>> pipeline
> >>> well, how do you
> expect
> >>> that the person we be
> >>> able to
> >>> migrate their
> pipeline to
> >>> another runner?
> >>>
> >>> A lot of hints
> like
> >>> "spark.persist" are really
> >>> the user
> >>> trying to tell us
> >>> something about the
> >>> PCollection, like
> >>> it is very
> small. I would
> >>> prefer if we gathered
> >>> this
> >>> information about
> >>> PTransforms and PCollections
> >>> instead
> >>> of runner
> specific knobs
> >>> since then each runner
> >>> can
> >>> choose how best
> to map
> >>> such a property on their
> >>> internal
> >>> representation.
> >>>
> >>> On Tue, Jan 30,
> 2018 at
> >>> 2:21 AM, Jean-Baptiste
> >>> Onofré
> >>> <[email protected]
> >>> <mailto:[email protected]> <mailto:
> [email protected]
> >>> <mailto:[email protected]>>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]> <mailto:
> [email protected]
> >>> <mailto:[email protected]>>>
> >>> <mailto:
> [email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>> <mailto:
> [email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>>>> wrote:
> >>>
> >>> Hi,
> >>>
> >>> As part of
> the
> >>> discussion about schema,
> >>> Romain
> >>> mentioned hint. I
> >>> think it's
> >>> worth to
> have an
> >>> explanation about that
> >>> and
> >>> especially it
> could be
> >>> wider than
> >>> schema.
> >>>
> >>> Today, to
> give
> >>> information to the runner,
> >>> we use
> >>> PipelineOptions.
> >>> The runner
> can
> >>> use these
> options,
> >>> and apply for all inner
> >>> representation
> of the
> >>> PCollection
> in
> >>> the runner.
> >>>
> >>> For
> instance, for the
> >>> Spark runner, the
> >>> persistence
> >>> storage level
> >>> (memory,
> disk,
> >>> ...) can be
> defined
> >>> via pipeline options.
> >>>
> >>> Then, the
> Spark
> >>> runner automatically
> >>> defines if
> >>> RDDs have to be
> >>> persisted
> (using
> >>> the storage
> level
> >>> defined in the pipeline
> >>> options),
> >>> for instance if
> >>> the same
> >>>
> POutput/PCollection
> >>> is read several time.
> >>>
> >>> However,
> the user
> >>> doesn't have any way to
> >>> provide
> >>> indication to the
> >>> runner to
> >>> deal with a
> specific
> >>> PCollection.
> >>>
> >>> Imagine,
> the user has
> >>> a pipeline like
> >>> this:
> >>>
> >>> pipeline.apply().apply().apply(). We
> >>> have three
> >>> PCollections involved in this
> >>> pipeline.
> >>> It's not
> >>> currently
> possible
> >>> to give
> indications
> >>> how the runner should
> >>> "optimized" and
> deal with
> >>> the second
> >>> PCollection
> only.
> >>>
> >>> The idea is
> to add a
> >>> method on the
> >>> PCollection:
> >>>
> >>>
> >>> PCollection.addHint(String key, Object
> >>> value);
> >>>
> >>> For
> instance:
> >>>
> >>>
> >>> collection.addHint("spark.persist",
> >>>
> StorageLevel.MEMORY_ONLY);
> >>>
> >>> I see three
> direct
> >>> usage of this:
> >>>
> >>> 1. Related
> to schema:
> >>> the schema
> >>> definition could
> >>> be a hint
> >>> 2. Related
> to the IO:
> >>> add headers for the
> >>> IO and
> >>> the runner how to
> >>> specifically
> >>> process a
> collection.
> >>> In Apache Camel, we
> >>> have
> >>> headers on the
> >>> message and
> >>> properties
> on the
> >>> exchange similar to
> >>> this. It
> >>> allows to give
> some
> >>> indication
> >>> how to
> process some
> >>> messages on the Camel
> >>> component. We
> can imagine
> >>> the same of
> >>> the IO
> (using the
> >>> PCollection hints to
> >>> react
> >>> accordingly).
> >>> 3. Related
> to runner
> >>> optimization: I see
> >>> for
> >>> instance a way
> to use
> >>> RDD or
> >>> dataframe
> in Spark
> >>> runner, or even
> >>> specific
> >>> optimization like
> >>> persist. I
> had lot
> >>> of
> questions from
> >>> Spark users saying: "in
> >>> my Spark
> >>> job, I know where
> >>> and how I
> >>> should use
> persist
> >>> (rdd.persist()), but I
> >>> can't do
> >>> such optimization
> >>> using
> >>> Beam". So
> it could be
> >>> a good improvements.
> >>>
> >>> Thoughts ?
> >>>
> >>> Regards
> >>> JB
> >>> --
> >>>
> Jean-Baptiste Onofré
> >>> [email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>>
> >>>
> >>> <mailto:[email protected] <mailto:
> [email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>
> >>> <mailto:[email protected] <mailto:
> [email protected]>
> >>> <mailto:[email protected]
> >>> <mailto:[email protected]>>>>
> >>> http://blog.nanthrax.net
> >>> Talend -
> >>> http://www.talend.com
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> [email protected]
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>