There have been suggestions in the past for Dataflow 1.x to extend
PipelineOptions to be usable per PTransform. So when you apply a PTransform
you can also provide a set of options that apply to it and all
subtransforms contained within. This is the closest suggestion to what your
describing that I can think of but I don't think its a good fit.

I don't really have a concrete approach as to how hints would be passed
through since I believe with a sufficiently powerful runner, none of these
hints are actually useful since the system should adapt during execution.

Until something is designed, this can be something that is tried out within
a specific runner initially by adding methods to a specific runner like:
runner.addHint(PValue, String key, String value)
runner.addHint(PTransform, String, key, String value)



On Tue, Jan 30, 2018 at 9:42 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Good point Luke: in that case, the hint will be ignored by the runner if
> the hint is not for him. The hint can be generic (not specific to a
> runner). It could be interesting for the schema support or IOs, not
> specific to a runner.
>
> What do you mean by gathering PTransforms/PCollections and where ?
>
> Thanks !
> Regards
> JB
>
> On 30/01/2018 18:35, Lukasz Cwik wrote:
>
>> If the hint is required to run the persons pipeline well, how do you
>> expect that the person we be able to migrate their pipeline to another
>> runner?
>>
>> A lot of hints like "spark.persist" are really the user trying to tell us
>> something about the PCollection, like it is very small. I would prefer if
>> we gathered this information about PTransforms and PCollections instead of
>> runner specific knobs since then each runner can choose how best to map
>> such a property on their internal representation.
>>
>> On Tue, Jan 30, 2018 at 2:21 AM, Jean-Baptiste Onofré <j...@nanthrax.net
>> <mailto:j...@nanthrax.net>> wrote:
>>
>>     Hi,
>>
>>     As part of the discussion about schema, Romain mentioned hint. I
>>     think it's
>>     worth to have an explanation about that and especially it could be
>>     wider than
>>     schema.
>>
>>     Today, to give information to the runner, we use PipelineOptions.
>>     The runner can
>>     use these options, and apply for all inner representation of the
>>     PCollection in
>>     the runner.
>>
>>     For instance, for the Spark runner, the persistence storage level
>>     (memory, disk,
>>     ...) can be defined via pipeline options.
>>
>>     Then, the Spark runner automatically defines if RDDs have to be
>>     persisted (using
>>     the storage level defined in the pipeline options), for instance if
>>     the same
>>     POutput/PCollection is read several time.
>>
>>     However, the user doesn't have any way to provide indication to the
>>     runner to
>>     deal with a specific PCollection.
>>
>>     Imagine, the user has a pipeline like this:
>>     pipeline.apply().apply().apply(). We
>>     have three PCollections involved in this pipeline. It's not
>>     currently possible
>>     to give indications how the runner should "optimized" and deal with
>>     the second
>>     PCollection only.
>>
>>     The idea is to add a method on the PCollection:
>>
>>     PCollection.addHint(String key, Object value);
>>
>>     For instance:
>>
>>     collection.addHint("spark.persist", StorageLevel.MEMORY_ONLY);
>>
>>     I see three direct usage of this:
>>
>>     1. Related to schema: the schema definition could be a hint
>>     2. Related to the IO: add headers for the IO and the runner how to
>>     specifically
>>     process a collection. In Apache Camel, we have headers on the
>>     message and
>>     properties on the exchange similar to this. It allows to give some
>>     indication
>>     how to process some messages on the Camel component. We can imagine
>>     the same of
>>     the IO (using the PCollection hints to react accordingly).
>>     3. Related to runner optimization: I see for instance a way to use
>>     RDD or
>>     dataframe in Spark runner, or even specific optimization like
>>     persist. I had lot
>>     of questions from Spark users saying: "in my Spark job, I know where
>>     and how I
>>     should use persist (rdd.persist()), but I can't do such optimization
>>     using
>>     Beam". So it could be a good improvements.
>>
>>     Thoughts ?
>>
>>     Regards
>>     JB
>>     --
>>     Jean-Baptiste Onofré
>>     jbono...@apache.org <mailto:jbono...@apache.org>
>>     http://blog.nanthrax.net
>>     Talend - http://www.talend.com
>>
>>
>>

Reply via email to