It seems like most of these use cases are hints on a PTransform and not a
PCollection, no? CPU, memory, expected parallelism, etc are. Then you could
just have:
pc.apply(WithHints(myTransform, <hints>))
For a PCollection hints that might make sense are bits like total size,
element size, and throughput. All things that the Dataflow folks have said
should be measured instead of hinted. But I understand that we shouldn't
force runners to do infeasible things like build a whole no-knobs service
on top of a super-knobby engine.
Incidentally for portability, we have this "environment" object that is
basically the docker URL of an SDK harness that can execute a function. We
always intended that same area of the proto (exact fields TBD) to have
things like requirements for CPU, memory, GPUs, disk, etc. It is likely a
good place for hints.
BTW good idea to ask users@ for their pain points and bring them back to
the dev list to motivate feature design discussions.
Kenn
On Tue, Jan 30, 2018 at 12:00 PM, Reuven Lax <[email protected]> wrote:
> I think the hints would logically be metadata in the pcollection, just
> like coder and schema.
>
> On Jan 30, 2018 11:57 AM, "Jean-Baptiste Onofré" <[email protected]> wrote:
>
>> Great idea for AddHints.of() !
>>
>> What would be the resulting PCollection ? Just a PCollection of hints or
>> the pc elements + hints ?
>>
>> Regards
>> JB
>>
>> On 30/01/2018 20:52, Reuven Lax wrote:
>>
>>> I think adding hints for runners is reasonable, though hints should
>>> always be assumed to be optional - they shouldn't change semantics of the
>>> program (otherwise you destroy the portability promise of Beam). However
>>> there are many types of hints that some runners might find useful (e.g.
>>> this step needs more memory. this step runs ML algorithms, and should run
>>> on a machine with GPUs. etc.)
>>>
>>> Robert has mentioned in the past that we should try and keep PCollection
>>> an immutable object, and not introduce new setters on it. We slightly break
>>> this already today with PCollection.setCoder, and that has caused some
>>> problems. Hints can be set on PTransforms though, and propagate to that
>>> PTransform's output PCollections. This is nearly as easy to use however, as
>>> we can implement a helper PTransform that can be used to set hints. I.e.
>>>
>>> pc.apply(AddHints.of(hint1, hint2, hint3))
>>>
>>> Is no harder than called pc.addHint()
>>>
>>> Reuven
>>>
>>> On Tue, Jan 30, 2018 at 11:39 AM, Jean-Baptiste Onofré <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Maybe I should have started the discussion on the user mailing list:
>>> it would be great to have user feedback on this, even if I got your
>>> points.
>>>
>>> Sometime, I have the feeling that whatever we are proposing and
>>> discussing, it doesn't go anywhere. At some point, to attract more
>>> people, we have to get ideas from different perspective/standpoint.
>>>
>>> Thanks for the feedback anyway.
>>>
>>> Regards
>>> JB
>>>
>>> On 30/01/2018 20:27, Romain Manni-Bucau wrote:
>>>
>>>
>>>
>>> 2018-01-30 19:52 GMT+01:00 Kenneth Knowles <[email protected]
>>> <mailto:[email protected]> <mailto:[email protected]
>>> <mailto:[email protected]>>>:
>>>
>>>
>>> I generally like having certain "escape hatches" that are
>>> well
>>> designed and limited in scope, and anything that turns out
>>> to be
>>> important becomes first-class. But this one I don't really
>>> like
>>> because the use cases belong elsewhere. Of course, they
>>> creep so you
>>> should assume they will be unbounded in how much gets
>>> stuffed into
>>> them. And the definition of a "hint" is that deleting it
>>> does not
>>> change semantics, just performance/monitor/UI etc but this
>>> does not
>>> seem to be true.
>>>
>>> "spark.persist" for idempotent replay in a sink:
>>> - this is already @RequiresStableInput
>>> - it is not a hint because if you don't persist your
>>> results are
>>> incorrect
>>> - it is a property of a DoFn / transform not a PCollection
>>>
>>>
>>> Let's put this last point aside since we'll manage to make it
>>> working wherever we store it ;).
>>>
>>>
>>> schema:
>>> - should be first-class
>>>
>>>
>>> Except it doesn't make sense everywhere. It is exactly like
>>> saying "implement this" and 2 lines later "it doesn't do
>>> anything for you". If you think wider on schema you will want to
>>> do far more - like getting them from the previous step etc... -
>>> which makes it not an API thing. However, with some runner like
>>> spark, being able to specifiy it will enable to optimize the
>>> execution. There is a clear mismatch between a consistent and
>>> user friendly generic and portable API, and a runtime, runner
>>> specific, implementation.
>>>
>>> This is all fine as an issue for a portable API and why all EE
>>> API have a map to pass properties somewhere so I don't see why
>>> beam wouldn't fall in that exact same bucket since it embraces
>>> the drawback of the portability and we already hit it since
>>> several releases.
>>>
>>>
>>> step parallelism (you didn't mention but most runners need
>>> some
>>> control):
>>> - this is a property of the data and the pipeline
>>> together, not
>>> just the pipeline
>>>
>>>
>>> Good one but this can be configured from the pipeline or even a
>>> transform. This doesn't mean the data is not important - and you
>>> are more than right on that point, just that it is configurable
>>> without referencing the data (using ranges is a trivial example
>>> even if not the most efficient).
>>>
>>>
>>> So I just don't actually see a use case for free-form hints
>>> that we
>>> haven't already covered.
>>>
>>>
>>> There are several cases, even in the direct runner to be able to
>>> industrialize it:
>>> - use that particular executor instance
>>> - debug these infos for that transform
>>>
>>> etc...
>>>
>>> As a high level design I think it is good to bring hints to beam
>>> to avoid to add ad-hoc solution each time and take the risk to
>>> loose the portability of the main API.
>>>
>>>
>>> Kenn
>>>
>>> On Tue, Jan 30, 2018 at 9:55 AM, Romain Manni-Bucau
>>> <[email protected] <mailto:[email protected]>
>>> <mailto:[email protected] <mailto:[email protected]>>>
>>> wrote:
>>>
>>> Lukasz, the point is that you have to choice to either
>>> bring all
>>> specificities to the main API which makes most of the
>>> API not
>>> usable or implemented or the opposite, not support
>>> anything.
>>> Introducing hints will allow to have eagerly for some
>>> runners
>>> some features - or just some very specific things - and
>>> once
>>> mainstream it can find a place in the main API. This is
>>> saner
>>> than the opposite since some specificities can never
>>> find a good
>>> place.
>>>
>>> The little thing we need to take care with that is to
>>> avoid to
>>> introduce some feature flipping as support some feature
>>> not
>>> doable with another runner. It should really be about
>>> runing a
>>> runner execution (like the schema in spark).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau
>>> <https://twitter.com/rmannibucau>> | Blog
>>> <https://rmannibucau.metawerx.net/
>>> <https://rmannibucau.metawerx.net/>> | Old Blog
>>> <http://rmannibucau.wordpress.com
>>> <http://rmannibucau.wordpress.com>> | Github
>>> <https://github.com/rmannibucau
>>> <https://github.com/rmannibucau>> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau
>>> <https://www.linkedin.com/in/rmannibucau>>
>>>
>>> 2018-01-30 18:42 GMT+01:00 Jean-Baptiste Onofré
>>> <[email protected] <mailto:[email protected]>
>>> <mailto:[email protected] <mailto:[email protected]>>>:
>>>
>>> Good point Luke: in that case, the hint will be
>>> ignored by
>>> the runner if the hint is not for him. The hint can
>>> be
>>> generic (not specific to a runner). It could be
>>> interesting
>>> for the schema support or IOs, not specific to a
>>> runner.
>>>
>>> What do you mean by gathering
>>> PTransforms/PCollections and
>>> where ?
>>>
>>> Thanks !
>>> Regards
>>> JB
>>>
>>> On 30/01/2018 18:35, Lukasz Cwik wrote:
>>>
>>> If the hint is required to run the persons
>>> pipeline
>>> well, how do you expect that the person we be
>>> able to
>>> migrate their pipeline to another runner?
>>>
>>> A lot of hints like "spark.persist" are really
>>> the user
>>> trying to tell us something about the
>>> PCollection, like
>>> it is very small. I would prefer if we gathered
>>> this
>>> information about PTransforms and PCollections
>>> instead
>>> of runner specific knobs since then each runner
>>> can
>>> choose how best to map such a property on their
>>> internal
>>> representation.
>>>
>>> On Tue, Jan 30, 2018 at 2:21 AM, Jean-Baptiste
>>> Onofré
>>> <[email protected] <mailto:[email protected]>
>>> <mailto:[email protected] <mailto:[email protected]>>
>>> <mailto:[email protected]
>>> <mailto:[email protected]> <mailto:[email protected]
>>> <mailto:[email protected]>>>> wrote:
>>>
>>> Hi,
>>>
>>> As part of the discussion about schema,
>>> Romain
>>> mentioned hint. I
>>> think it's
>>> worth to have an explanation about that and
>>> especially it could be
>>> wider than
>>> schema.
>>>
>>> Today, to give information to the runner,
>>> we use
>>> PipelineOptions.
>>> The runner can
>>> use these options, and apply for all inner
>>> representation of the
>>> PCollection in
>>> the runner.
>>>
>>> For instance, for the Spark runner, the
>>> persistence
>>> storage level
>>> (memory, disk,
>>> ...) can be defined via pipeline options.
>>>
>>> Then, the Spark runner automatically
>>> defines if
>>> RDDs have to be
>>> persisted (using
>>> the storage level defined in the pipeline
>>> options),
>>> for instance if
>>> the same
>>> POutput/PCollection is read several time.
>>>
>>> However, the user doesn't have any way to
>>> provide
>>> indication to the
>>> runner to
>>> deal with a specific PCollection.
>>>
>>> Imagine, the user has a pipeline like this:
>>> pipeline.apply().apply().apply(). We
>>> have three PCollections involved in this
>>> pipeline.
>>> It's not
>>> currently possible
>>> to give indications how the runner should
>>> "optimized" and deal with
>>> the second
>>> PCollection only.
>>>
>>> The idea is to add a method on the
>>> PCollection:
>>>
>>> PCollection.addHint(String key, Object
>>> value);
>>>
>>> For instance:
>>>
>>> collection.addHint("spark.persist",
>>> StorageLevel.MEMORY_ONLY);
>>>
>>> I see three direct usage of this:
>>>
>>> 1. Related to schema: the schema
>>> definition could
>>> be a hint
>>> 2. Related to the IO: add headers for the
>>> IO and
>>> the runner how to
>>> specifically
>>> process a collection. In Apache Camel, we
>>> have
>>> headers on the
>>> message and
>>> properties on the exchange similar to
>>> this. It
>>> allows to give some
>>> indication
>>> how to process some messages on the Camel
>>> component. We can imagine
>>> the same of
>>> the IO (using the PCollection hints to
>>> react
>>> accordingly).
>>> 3. Related to runner optimization: I see
>>> for
>>> instance a way to use
>>> RDD or
>>> dataframe in Spark runner, or even specific
>>> optimization like
>>> persist. I had lot
>>> of questions from Spark users saying: "in
>>> my Spark
>>> job, I know where
>>> and how I
>>> should use persist (rdd.persist()), but I
>>> can't do
>>> such optimization
>>> using
>>> Beam". So it could be a good improvements.
>>>
>>> Thoughts ?
>>>
>>> Regards
>>> JB
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected] <mailto:[email protected]>
>>> <mailto:[email protected] <mailto:[email protected]>>
>>> <mailto:[email protected]
>>> <mailto:[email protected]> <mailto:[email protected]
>>> <mailto:[email protected]>>>
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>>
>>>
>>>
>>>
>>>