Hi Akshay,

I think you're bringing up a very important point. Simplicity with minimal complexity is something that we strive for. In the case of the Write transform, the complexity was mainly added due to historical reasons which Kenneth mentioned.

It is to note that some Runners don't even benefit from it because they don't support incremental recovery. I believe we will do work in the future to simplify the Write transform.

If you look at other pipelines which don't use that transform you will find that they are much simpler.

What can really help is to not expand the composite transforms, but transforms need to be expanded during translation and collapsing those transforms again after translation to Spark/Flink can be tricky.

Generally speaking, we have realized this is an issue and have plans to fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859.

Thanks,
Max

On 28.11.18 16:52, Kenneth Knowles wrote:
In fact, that logic in FileIO is "required" to have consistent output even just for batch computation, because any step along the way may fail and retry.

I put "required" in quotes because there's a legacy concern at play here - FileIO is written using the assumption that shuffles are checkpointing. What is actually required is that the input to the last stage will be the same, even if the whole pipeline goes down and has to be brought back up. So the extra shuffle in those pipelines represents a necessary checkpoint prior to running the last stage. In the saveAsTextFile primitive (caveat: my understanding is vague and stale) this would be done in a checkpoint finalization callback, and you have to wait for that to complete before consuming the output if you want to ensure correctness.

Another relevant piece of information is that Beam has support for two things that would make it easier to decipher the UI:

1. Nodes can have meaningful names. So that would make it obvious which part is doing what. 2. Transforms can be built as composites of other transforms, and this is encouraged. In some UIs, notable Cloud Dataflow, the composites are shown as a single box, so it is easier to understand.

I would not expect every engine to adopt all of Beam's features like these, but there might be a clever way to make the information available.

Kenn

On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson <timrobertson...@gmail.com <mailto:timrobertson...@gmail.com>> wrote:

    Hi Akshay

    My understanding is that this all comes from the final FileIO
    write() stage.

    When writing, the FileIO puts data into temporary files similar to
    the output formats for Hadoop MapReduce. Once ready to commit, it
    does something along the lines of a directory scan to determine
    which files need to be moved into the final output location. It is
    that directory scan stage that causes the complex DAG and it runs
    very quickly. While it looks complicated, I gather it is necessary
    to support the needs of batch/streaming and in particular the
    behaviour under failure scenarios.

    I agree with you that from a developer perspective it is very
    difficult to understand. If you were to replace the FileIO write()
    with e.g. a push into a database (JdbcIO), or ElasticsearchIO or
    SolrIO etc you will see a much more familiar and simpler to
    understand DAG - it might be worth trying that to see. Over time I
    expect you will simply ignore that final job when looking at the DAG
    as you know it is just the output committer stage.

    I don't know if you are using HDFS but if so, please be aware of
    BEAM-5036 [1] which is fixed in 2.9.0-SNAPSHOT and will be released
    with 2.9.0 in the coming days. It relates to what I outline above,
    where the files were actually copied into place rather than simply
    moved. On my jobs, I saw a very large increase in performance
    because of this and brought Beam much closer to native spark in
    terms of runtime performance.

    I hope this helps,
    Tim


    [1] https://issues.apache.org/jira/browse/BEAM-5036

    On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole
    <akshaymend...@gmail.com <mailto:akshaymend...@gmail.com>> wrote:

        Hi,
             We are in a process of evaluating different execution
        engines (mainly apache spark and apache flink) for our
        production batch and streaming pipelines. We thought of using
        apache beam as a unified programming model framework to write
        the pipelines. When we executed simple wordcount pipeline using
        both flink-runner and spark-runner, we saw that the DAG for the
        pipeline in both flink and spark when executed using beam code
        had lot of operators/nodes which cannot be explained. When we
        wrote the same wordcount program using the APIs provided by the
        underlined execution engine, the DAGs were way too simpler and
        could be easily explained.
        Below is an example of wordcount program executed in spark.

        This is the DAG when we executed this
        <https://pastebin.com/3MZZPgJk> code developed using spark RDD APIs.
        Screen Shot 2018-11-28 at 11.39.35 AM.png




        This is the DAG when we executed this
        <https://pastebin.com/ABtUDmvC> code developed using beam
        pipeline APIs.
        Screen Shot 2018-11-28 at 11.40.04 AM.png Screen Shot 2018-11-28
        at 11.40.11 AM.png



        We observed *same* *behaviour* when we executed the pipeline
        using flink runner.
        While this is simple word count, we observed when we wrote our
        complex pipelines in beam and executed, they led to DAGs which
        were almost impossible to explain :-( .

        We have the following concerns regarding the same
        1. Is the gigantic DAG expected?
        2. If so, why so? And will it cause any performance impacts?
        3. Since the DAG generated cannot be explained, are there better
        ways to understand from developer point of view?

        It would be great if someone helps us in this regard.

        Thanks,
        Akshay




Reply via email to