It seems like most of these use cases are hints on a PTransform and not a
PCollection, no? CPU, memory, expected parallelism, etc are. Then you could
just have:
    pc.apply(WithHints(myTransform, <hints>))

For a PCollection hints that might make sense are bits like total size,
element size, and throughput. All things that the Dataflow folks have said
should be measured instead of hinted. But I understand that we shouldn't
force runners to do infeasible things like build a whole no-knobs service
on top of a super-knobby engine.

Incidentally for portability, we have this "environment" object that is
basically the docker URL of an SDK harness that can execute a function. We
always intended that same area of the proto (exact fields TBD) to have
things like requirements for CPU, memory, GPUs, disk, etc. It is likely a
good place for hints.

BTW good idea to ask users@ for their pain points and bring them back to
the dev list to motivate feature design discussions.

Kenn

On Tue, Jan 30, 2018 at 12:00 PM, Reuven Lax <[email protected]> wrote:

> I think the hints would logically be metadata in the pcollection, just
> like coder and schema.
>
> On Jan 30, 2018 11:57 AM, "Jean-Baptiste Onofré" <[email protected]> wrote:
>
>> Great idea for AddHints.of() !
>>
>> What would be the resulting PCollection ? Just a PCollection of hints or
>> the pc elements + hints ?
>>
>> Regards
>> JB
>>
>> On 30/01/2018 20:52, Reuven Lax wrote:
>>
>>> I think adding hints for runners is reasonable, though hints should
>>> always be assumed to be optional - they shouldn't change semantics of the
>>> program (otherwise you destroy the portability promise of Beam). However
>>> there are many types of hints that some runners might find useful (e.g.
>>> this step needs more memory. this step runs ML algorithms, and should run
>>> on a machine with GPUs. etc.)
>>>
>>> Robert has mentioned in the past that we should try and keep PCollection
>>> an immutable object, and not introduce new setters on it. We slightly break
>>> this already today with PCollection.setCoder, and that has caused some
>>> problems. Hints can be set on PTransforms though, and propagate to that
>>> PTransform's output PCollections. This is nearly as easy to use however, as
>>> we can implement a helper PTransform that can be used to set hints. I.e.
>>>
>>> pc.apply(AddHints.of(hint1, hint2, hint3))
>>>
>>> Is no harder than called pc.addHint()
>>>
>>> Reuven
>>>
>>> On Tue, Jan 30, 2018 at 11:39 AM, Jean-Baptiste Onofré <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>>     Maybe I should have started the discussion on the user mailing list:
>>>     it would be great to have user feedback on this, even if I got your
>>>     points.
>>>
>>>     Sometime, I have the feeling that whatever we are proposing and
>>>     discussing, it doesn't go anywhere. At some point, to attract more
>>>     people, we have to get ideas from different perspective/standpoint.
>>>
>>>     Thanks for the feedback anyway.
>>>
>>>     Regards
>>>     JB
>>>
>>>     On 30/01/2018 20:27, Romain Manni-Bucau wrote:
>>>
>>>
>>>
>>>         2018-01-30 19:52 GMT+01:00 Kenneth Knowles <[email protected]
>>>         <mailto:[email protected]> <mailto:[email protected]
>>>         <mailto:[email protected]>>>:
>>>
>>>
>>>              I generally like having certain "escape hatches" that are
>>> well
>>>              designed and limited in scope, and anything that turns out
>>>         to be
>>>              important becomes first-class. But this one I don't really
>>> like
>>>              because the use cases belong elsewhere. Of course, they
>>>         creep so you
>>>              should assume they will be unbounded in how much gets
>>>         stuffed into
>>>              them. And the definition of a "hint" is that deleting it
>>>         does not
>>>              change semantics, just performance/monitor/UI etc but this
>>>         does not
>>>              seem to be true.
>>>
>>>              "spark.persist" for idempotent replay in a sink:
>>>                - this is already @RequiresStableInput
>>>                - it is not a hint because if you don't persist your
>>>         results are
>>>              incorrect
>>>                - it is a property of a DoFn / transform not a PCollection
>>>
>>>
>>>         Let's put this last point aside since we'll manage to make it
>>>         working wherever we store it ;).
>>>
>>>
>>>              schema:
>>>                - should be first-class
>>>
>>>
>>>         Except it doesn't make sense everywhere. It is exactly like
>>>         saying "implement this" and 2 lines later "it doesn't do
>>>         anything for you". If you think wider on schema you will want to
>>>         do far more - like getting them from the previous step etc... -
>>>         which makes it not an API thing. However, with some runner like
>>>         spark, being able to specifiy it will enable to optimize the
>>>         execution. There is a clear mismatch between a consistent and
>>>         user friendly generic and portable API, and a runtime, runner
>>>         specific, implementation.
>>>
>>>         This is all fine as an issue for a portable API and why all EE
>>>         API have a map to pass properties somewhere so I don't see why
>>>         beam wouldn't fall in that exact same bucket since it embraces
>>>         the drawback of the portability and we already hit it since
>>>         several releases.
>>>
>>>
>>>              step parallelism (you didn't mention but most runners need
>>> some
>>>              control):
>>>                - this is a property of the data and the pipeline
>>>         together, not
>>>              just the pipeline
>>>
>>>
>>>         Good one but this can be configured from the pipeline or even a
>>>         transform. This doesn't mean the data is not important - and you
>>>         are more than right on that point, just that it is configurable
>>>         without referencing the data (using ranges is a trivial example
>>>         even if not the most efficient).
>>>
>>>
>>>              So I just don't actually see a use case for free-form hints
>>>         that we
>>>              haven't already covered.
>>>
>>>
>>>         There are several cases, even in the direct runner to be able to
>>>         industrialize it:
>>>         - use that particular executor instance
>>>         - debug these infos for that transform
>>>
>>>         etc...
>>>
>>>         As a high level design I think it is good to bring hints to beam
>>>         to avoid to add ad-hoc solution each time and take the risk to
>>>         loose the portability of the main API.
>>>
>>>
>>>              Kenn
>>>
>>>              On Tue, Jan 30, 2018 at 9:55 AM, Romain Manni-Bucau
>>>              <[email protected] <mailto:[email protected]>
>>>         <mailto:[email protected] <mailto:[email protected]>>>
>>>         wrote:
>>>
>>>                  Lukasz, the point is that you have to choice to either
>>>         bring all
>>>                  specificities to the main API which makes most of the
>>>         API not
>>>                  usable or implemented or the opposite, not support
>>>         anything.
>>>                  Introducing hints will allow to have eagerly for some
>>>         runners
>>>                  some features - or just some very specific things - and
>>>         once
>>>                  mainstream it can find a place in the main API. This is
>>>         saner
>>>                  than the opposite since some specificities can never
>>>         find a good
>>>                  place.
>>>
>>>                  The little thing we need to take care with that is to
>>>         avoid to
>>>                  introduce some feature flipping as support some feature
>>> not
>>>                  doable with another runner. It should really be about
>>>         runing a
>>>                  runner execution (like the schema in spark).
>>>
>>>
>>>                  Romain Manni-Bucau
>>>                  @rmannibucau <https://twitter.com/rmannibucau
>>>         <https://twitter.com/rmannibucau>> | Blog
>>>                  <https://rmannibucau.metawerx.net/
>>>         <https://rmannibucau.metawerx.net/>> | Old Blog
>>>                  <http://rmannibucau.wordpress.com
>>>         <http://rmannibucau.wordpress.com>> | Github
>>>                  <https://github.com/rmannibucau
>>>         <https://github.com/rmannibucau>> | LinkedIn
>>>                  <https://www.linkedin.com/in/rmannibucau
>>>         <https://www.linkedin.com/in/rmannibucau>>
>>>
>>>                  2018-01-30 18:42 GMT+01:00 Jean-Baptiste Onofré
>>>         <[email protected] <mailto:[email protected]>
>>>                  <mailto:[email protected] <mailto:[email protected]>>>:
>>>
>>>                      Good point Luke: in that case, the hint will be
>>>         ignored by
>>>                      the runner if the hint is not for him. The hint can
>>> be
>>>                      generic (not specific to a runner). It could be
>>>         interesting
>>>                      for the schema support or IOs, not specific to a
>>>         runner.
>>>
>>>                      What do you mean by gathering
>>>         PTransforms/PCollections and
>>>                      where ?
>>>
>>>                      Thanks !
>>>                      Regards
>>>                      JB
>>>
>>>                      On 30/01/2018 18:35, Lukasz Cwik wrote:
>>>
>>>                          If the hint is required to run the persons
>>> pipeline
>>>                          well, how do you expect that the person we be
>>>         able to
>>>                          migrate their pipeline to another runner?
>>>
>>>                          A lot of hints like "spark.persist" are really
>>>         the user
>>>                          trying to tell us something about the
>>>         PCollection, like
>>>                          it is very small. I would prefer if we gathered
>>>         this
>>>                          information about PTransforms and PCollections
>>>         instead
>>>                          of runner specific knobs since then each runner
>>> can
>>>                          choose how best to map such a property on their
>>>         internal
>>>                          representation.
>>>
>>>                          On Tue, Jan 30, 2018 at 2:21 AM, Jean-Baptiste
>>>         Onofré
>>>                          <[email protected] <mailto:[email protected]>
>>>         <mailto:[email protected] <mailto:[email protected]>>
>>>                          <mailto:[email protected]
>>>         <mailto:[email protected]> <mailto:[email protected]
>>>         <mailto:[email protected]>>>> wrote:
>>>
>>>                               Hi,
>>>
>>>                               As part of the discussion about schema,
>>> Romain
>>>                          mentioned hint. I
>>>                               think it's
>>>                               worth to have an explanation about that and
>>>                          especially it could be
>>>                               wider than
>>>                               schema.
>>>
>>>                               Today, to give information to the runner,
>>>         we use
>>>                          PipelineOptions.
>>>                               The runner can
>>>                               use these options, and apply for all inner
>>>                          representation of the
>>>                               PCollection in
>>>                               the runner.
>>>
>>>                               For instance, for the Spark runner, the
>>>         persistence
>>>                          storage level
>>>                               (memory, disk,
>>>                               ...) can be defined via pipeline options.
>>>
>>>                               Then, the Spark runner automatically
>>>         defines if
>>>                          RDDs have to be
>>>                               persisted (using
>>>                               the storage level defined in the pipeline
>>>         options),
>>>                          for instance if
>>>                               the same
>>>                               POutput/PCollection is read several time.
>>>
>>>                               However, the user doesn't have any way to
>>>         provide
>>>                          indication to the
>>>                               runner to
>>>                               deal with a specific PCollection.
>>>
>>>                               Imagine, the user has a pipeline like this:
>>>                               pipeline.apply().apply().apply(). We
>>>                               have three PCollections involved in this
>>>         pipeline.
>>>                          It's not
>>>                               currently possible
>>>                               to give indications how the runner should
>>>                          "optimized" and deal with
>>>                               the second
>>>                               PCollection only.
>>>
>>>                               The idea is to add a method on the
>>>         PCollection:
>>>
>>>                               PCollection.addHint(String key, Object
>>> value);
>>>
>>>                               For instance:
>>>
>>>                               collection.addHint("spark.persist",
>>>                          StorageLevel.MEMORY_ONLY);
>>>
>>>                               I see three direct usage of this:
>>>
>>>                               1. Related to schema: the schema
>>>         definition could
>>>                          be a hint
>>>                               2. Related to the IO: add headers for the
>>>         IO and
>>>                          the runner how to
>>>                               specifically
>>>                               process a collection. In Apache Camel, we
>>> have
>>>                          headers on the
>>>                               message and
>>>                               properties on the exchange similar to
>>> this. It
>>>                          allows to give some
>>>                               indication
>>>                               how to process some messages on the Camel
>>>                          component. We can imagine
>>>                               the same of
>>>                               the IO (using the PCollection hints to
>>> react
>>>                          accordingly).
>>>                               3. Related to runner optimization: I see
>>> for
>>>                          instance a way to use
>>>                               RDD or
>>>                               dataframe in Spark runner, or even specific
>>>                          optimization like
>>>                               persist. I had lot
>>>                               of questions from Spark users saying: "in
>>>         my Spark
>>>                          job, I know where
>>>                               and how I
>>>                               should use persist (rdd.persist()), but I
>>>         can't do
>>>                          such optimization
>>>                               using
>>>                               Beam". So it could be a good improvements.
>>>
>>>                               Thoughts ?
>>>
>>>                               Regards
>>>                               JB
>>>                               --
>>>                               Jean-Baptiste Onofré
>>>         [email protected] <mailto:[email protected]>
>>>         <mailto:[email protected] <mailto:[email protected]>>
>>>                          <mailto:[email protected]
>>>         <mailto:[email protected]> <mailto:[email protected]
>>>         <mailto:[email protected]>>>
>>>         http://blog.nanthrax.net
>>>                               Talend - http://www.talend.com
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Reply via email to