Good point Luke: in that case, the hint will be ignored by the runner if the hint is not for him. The hint can be generic (not specific to a runner). It could be interesting for the schema support or IOs, not specific to a runner.

What do you mean by gathering PTransforms/PCollections and where ?

Thanks !
Regards
JB

On 30/01/2018 18:35, Lukasz Cwik wrote:
If the hint is required to run the persons pipeline well, how do you expect that the person we be able to migrate their pipeline to another runner?

A lot of hints like "spark.persist" are really the user trying to tell us something about the PCollection, like it is very small. I would prefer if we gathered this information about PTransforms and PCollections instead of runner specific knobs since then each runner can choose how best to map such a property on their internal representation.

On Tue, Jan 30, 2018 at 2:21 AM, Jean-Baptiste Onofré <[email protected] <mailto:[email protected]>> wrote:

    Hi,

    As part of the discussion about schema, Romain mentioned hint. I
    think it's
    worth to have an explanation about that and especially it could be
    wider than
    schema.

    Today, to give information to the runner, we use PipelineOptions.
    The runner can
    use these options, and apply for all inner representation of the
    PCollection in
    the runner.

    For instance, for the Spark runner, the persistence storage level
    (memory, disk,
    ...) can be defined via pipeline options.

    Then, the Spark runner automatically defines if RDDs have to be
    persisted (using
    the storage level defined in the pipeline options), for instance if
    the same
    POutput/PCollection is read several time.

    However, the user doesn't have any way to provide indication to the
    runner to
    deal with a specific PCollection.

    Imagine, the user has a pipeline like this:
    pipeline.apply().apply().apply(). We
    have three PCollections involved in this pipeline. It's not
    currently possible
    to give indications how the runner should "optimized" and deal with
    the second
    PCollection only.

    The idea is to add a method on the PCollection:

    PCollection.addHint(String key, Object value);

    For instance:

    collection.addHint("spark.persist", StorageLevel.MEMORY_ONLY);

    I see three direct usage of this:

    1. Related to schema: the schema definition could be a hint
    2. Related to the IO: add headers for the IO and the runner how to
    specifically
    process a collection. In Apache Camel, we have headers on the
    message and
    properties on the exchange similar to this. It allows to give some
    indication
    how to process some messages on the Camel component. We can imagine
    the same of
    the IO (using the PCollection hints to react accordingly).
    3. Related to runner optimization: I see for instance a way to use
    RDD or
    dataframe in Spark runner, or even specific optimization like
    persist. I had lot
    of questions from Spark users saying: "in my Spark job, I know where
    and how I
    should use persist (rdd.persist()), but I can't do such optimization
    using
    Beam". So it could be a good improvements.

    Thoughts ?

    Regards
    JB
    --
    Jean-Baptiste Onofré
    [email protected] <mailto:[email protected]>
    http://blog.nanthrax.net
    Talend - http://www.talend.com


Reply via email to