Hi Akshay,
I think you're bringing up a very important point. Simplicity with
minimal complexity is something that we strive for. In the case of the
Write transform, the complexity was mainly added due to historical
reasons which Kenneth mentioned.
It is to note that some Runners don't even benefit from it because they
don't support incremental recovery. I believe we will do work in the
future to simplify the Write transform.
If you look at other pipelines which don't use that transform you will
find that they are much simpler.
What can really help is to not expand the composite transforms, but
transforms need to be expanded during translation and collapsing those
transforms again after translation to Spark/Flink can be tricky.
Generally speaking, we have realized this is an issue and have plans to
fix it, e.g. https://issues.apache.org/jira/browse/BEAM-5859.
Thanks,
Max
On 28.11.18 16:52, Kenneth Knowles wrote:
In fact, that logic in FileIO is "required" to have consistent output
even just for batch computation, because any step along the way may fail
and retry.
I put "required" in quotes because there's a legacy concern at play here
- FileIO is written using the assumption that shuffles are
checkpointing. What is actually required is that the input to the last
stage will be the same, even if the whole pipeline goes down and has to
be brought back up. So the extra shuffle in those pipelines represents a
necessary checkpoint prior to running the last stage. In the
saveAsTextFile primitive (caveat: my understanding is vague and stale)
this would be done in a checkpoint finalization callback, and you have
to wait for that to complete before consuming the output if you want to
ensure correctness.
Another relevant piece of information is that Beam has support for two
things that would make it easier to decipher the UI:
1. Nodes can have meaningful names. So that would make it obvious which
part is doing what.
2. Transforms can be built as composites of other transforms, and this
is encouraged. In some UIs, notable Cloud Dataflow, the composites are
shown as a single box, so it is easier to understand.
I would not expect every engine to adopt all of Beam's features like
these, but there might be a clever way to make the information available.
Kenn
On Wed, Nov 28, 2018 at 12:27 AM Tim Robertson
<timrobertson...@gmail.com <mailto:timrobertson...@gmail.com>> wrote:
Hi Akshay
My understanding is that this all comes from the final FileIO
write() stage.
When writing, the FileIO puts data into temporary files similar to
the output formats for Hadoop MapReduce. Once ready to commit, it
does something along the lines of a directory scan to determine
which files need to be moved into the final output location. It is
that directory scan stage that causes the complex DAG and it runs
very quickly. While it looks complicated, I gather it is necessary
to support the needs of batch/streaming and in particular the
behaviour under failure scenarios.
I agree with you that from a developer perspective it is very
difficult to understand. If you were to replace the FileIO write()
with e.g. a push into a database (JdbcIO), or ElasticsearchIO or
SolrIO etc you will see a much more familiar and simpler to
understand DAG - it might be worth trying that to see. Over time I
expect you will simply ignore that final job when looking at the DAG
as you know it is just the output committer stage.
I don't know if you are using HDFS but if so, please be aware of
BEAM-5036 [1] which is fixed in 2.9.0-SNAPSHOT and will be released
with 2.9.0 in the coming days. It relates to what I outline above,
where the files were actually copied into place rather than simply
moved. On my jobs, I saw a very large increase in performance
because of this and brought Beam much closer to native spark in
terms of runtime performance.
I hope this helps,
Tim
[1] https://issues.apache.org/jira/browse/BEAM-5036
On Wed, Nov 28, 2018 at 7:34 AM Akshay Mendole
<akshaymend...@gmail.com <mailto:akshaymend...@gmail.com>> wrote:
Hi,
We are in a process of evaluating different execution
engines (mainly apache spark and apache flink) for our
production batch and streaming pipelines. We thought of using
apache beam as a unified programming model framework to write
the pipelines. When we executed simple wordcount pipeline using
both flink-runner and spark-runner, we saw that the DAG for the
pipeline in both flink and spark when executed using beam code
had lot of operators/nodes which cannot be explained. When we
wrote the same wordcount program using the APIs provided by the
underlined execution engine, the DAGs were way too simpler and
could be easily explained.
Below is an example of wordcount program executed in spark.
This is the DAG when we executed this
<https://pastebin.com/3MZZPgJk> code developed using spark RDD APIs.
Screen Shot 2018-11-28 at 11.39.35 AM.png
This is the DAG when we executed this
<https://pastebin.com/ABtUDmvC> code developed using beam
pipeline APIs.
Screen Shot 2018-11-28 at 11.40.04 AM.png Screen Shot 2018-11-28
at 11.40.11 AM.png
We observed *same* *behaviour* when we executed the pipeline
using flink runner.
While this is simple word count, we observed when we wrote our
complex pipelines in beam and executed, they led to DAGs which
were almost impossible to explain :-( .
We have the following concerns regarding the same
1. Is the gigantic DAG expected?
2. If so, why so? And will it cause any performance impacts?
3. Since the DAG generated cannot be explained, are there better
ways to understand from developer point of view?
It would be great if someone helps us in this regard.
Thanks,
Akshay