Hi,

On 5/3/19 12:20 PM, Maximilian Michels wrote:
Hi Jan,

Typical example could be a machine learning task, where you might have a lot of data cleansing and simple transformations, followed by some ML algorithm (e.g. SVD). One might want to use Spark MLlib for the ML task, but Beam for all the transformations around. Then, porting to different runner would mean only provide different implementation of the SVD, but everything else would remaining the same.

This is a fair point. Of course you could always split up the pipeline into two jobs, e.g. have a native Spark job and a Beam job running on Spark.

Something that came to my mind is "unsafe" in Rust which allows you to leave the safe abstractions of Rust and use raw C code. If Beam had something like that which really emphasized the non-portable aspect of a transform, that could change things:

  Pipeline p = ..
  p.getOptions().setAllowNonPortable(true);
  p.apply(
      NonPortable.of(new MyFlinkOperator(), FlinkRunner.class));

Again, I'm not sure we want to go down that road, but if there are really specific use cases, we could think about it.
Yes, this is exactly what I meant. I think that this doesn't threat any of Beam's selling points, because this way, you declare you *want* your pipeline being non-portable, so if you don't do it on purpose, your pipeline will still be portable. The key point here is that the underlying systems are likely to evolve quicker than Beam (in some directions or some ways - Beam might on the other hand bring features to these systems, that's for sure). Examples might be Spark's MLlib or Flink's iterative streams.

Generally, there are optimizations that could be really dependent on the pipeline. Only then you might have enough information that can result in some very specific optimization.

If these pattern can be detected in DAGs, then we can built optimizations into the FlinkRunner. If that is not feasible, then you're out luck. Could you describe an optimization that you miss in Beam?

I think that sometimes you cannot infer all possible optimizations from the DAG itself. If you read from a source (e.g. Kafka), information about how do you partition data when writing to Kafka might help you avoid additional shuffling in some cases. That's probably something you could be in theory able to do via some annotations of sources, but the fundamental question here is - do you really want to do that? Or just let the user perform some hard coding when he knows that it might help in his particular case (possible even corner case)?

Jan


Cheers,
Max

On 02.05.19 22:44, Jan Lukavský wrote:
Hi Max,

comments inline.

On 5/2/19 3:29 PM, Maximilian Michels wrote:
Couple of comments:

* Flink transforms

It wouldn't be hard to add a way to run arbitrary Flink operators through the Beam API. Like you said, once you go down that road, you loose the ability to run the pipeline on a different Runner. And that's precisely one of the selling points of Beam. I'm afraid once you even allow 1% non-portable pipelines, you have lost it all.
Absolutely true, but - the question here is "how much effort do I have to invest in order to port pipeline to different runner?". If this effort is low, I'd say the pipeline remains "nearly portable". Typical example could be a machine learning task, where you might have a lot of data cleansing and simple transformations, followed by some ML algorithm (e.g. SVD). One might want to use Spark MLlib for the ML task, but Beam for all the transformations around. Then, porting to different runner would mean only provide different implementation of the SVD, but everything else would remaining the same.

Now, it would be a different story if we had a runner-agnostic way of running Flink operators on top of Beam. For a subset of the Flink transformations that might actually be possible. I'm not sure if it's feasible for Beam to depend on the Flink API.

* Pipeline Tuning

There are less bells and whistlers in the Beam API then there are in Flink's. I'd consider that a feature. As Robert pointed out, the Runner can make any optimizations that it wants to do. If you have an idea for an optimizations we could built it into the FlinkRunner.

Generally, there are optimizations that could be really dependent on the pipeline. Only then you might have enough information that can result in some very specific optimization.

Jan


On 02.05.19 13:44, Robert Bradshaw wrote:
Correct, there's no out of the box way to do this. As mentioned, this
would also result in non-portable pipelines. However, even the
portability framework is set up such that runners can recognize
particular transforms and provide their own implementations thereof
(which is how translations are done for ParDo, GroupByKey, etc.) and
it is encouraged that runners do this for composite operations they
have can do better on (e.g. I know Flink maps Reshard directly to
Redistribute rather than using the generic pair-with-random-key
implementation).

If you really want to do this for MyFancyFlinkOperator, the current
solution is to adapt/extend FlinkRunner (possibly forking code) to
understand this operation and its substitution.


On Thu, May 2, 2019 at 11:09 AM Jan Lukavský <je...@seznam.cz> wrote:

Just to clarify - the code I posted is just a proposal, it is not
actually possible currently.

On 5/2/19 11:05 AM, Jan Lukavský wrote:
Hi,

I'd say that what Pankaj meant could be rephrased as "What if I want
to manually tune or tweak my Pipeline for specific runner? Do I have
any options for that?". As I understand it, currently the answer is,
no, PTransforms are somewhat hardwired into runners and the way they
expand cannot be controlled or tuned. But, that could be changed,
maybe something like this would make it possible:

PCollection<...> in = ...;
in.apply(new MyFancyFlinkOperator());

// ...

in.getPipeline().getOptions().as(FlinkPipelineOptions.class).setTransformOverride(MyFancyFlinkOperator.class,
new MyFancyFlinkOperatorExpander());


The `expander` could have access to full Flink API (through the
runner) and that way any transform could be overridden or customized
for specific runtime conditions. Of course, this has downside, that
you end up with non portable pipeline (also, this is probably in
conflict with general portability framework). But, as I see it,
usually you would need to override only very small part of you
pipeline. So, say 90% of pipeline would be portable and in order to
port it to different runner, you would need to implement only small
specific part.

Jan

On 5/2/19 9:45 AM, Robert Bradshaw wrote:
On Thu, May 2, 2019 at 12:13 AM Pankaj Chand
<pankajchanda...@gmail.com> wrote:
I thought by choosing Beam, I would get the benefits of both (Spark
and Flink), one at a time. Now, I'm understanding that I might not
get the full potential from either of the two.
You get the benefit of being able to choose, without rewriting your
pipeline, whether to run on Spark or Flink. Or the next new runner
that comes around. As well as the Beam model, API, etc.

Example: If I use Beam with Flink, and then a new feature is added
to Flink but I cannot access it via Beam, and that feature is not
important to the Beam community, then what is the suggested
workaround? If I really need that feature, I would not want to
re-write my pipeline in Flink from scratch.
How is this different from "If I used Spark, and a new feature is
added to Flink, I cannot access it from Spark. If that feature is not
important to the Spark community, then what is the suggested
workaround? If I really need that feature, I would not want to
re-write my pipeline in Flink from scratch." Or vice-versa. Or when a new feature added to Beam itself (that may not be not present in any
of the underlying systems). Beam's feature set is neither the
intersection nor union of the feature sets of those runners it has
available as execution engines. (Even the notion of what is meant by
"feature" is nebulous enough that it's hard to make this discussion
concrete.)

Is it possible that in the near future, most of Beam's capabilities
would favor Google's Dataflow API? That way, Beam could be used to
lure developers and organizations who would typically use
Spark/Flink, with the promise of portability. After they get
dependent on Beam and cannot afford to re-write their pipelines in
Spark/Flink from scratch, they would realize that Beam does not give access to some of the capabilities of the free engines that they may
require. Then, they would be told that if they want all possible
capabilities and would want to use their code in Beam, they could
pay for the Dataflow engine instead.
Google is very upfront about the fact that they are selling a service
to run Beam pipelines in a completely managed way. But Google has
*also* invested very heavily in making sure that the portability story
is not just talk, for those who need or want to run their jobs on
premise or elsewhere (now or in the future). It is our goal that all runners be able to run all pipelines, and this is a community effort.

Pankaj

On Tue, Apr 30, 2019 at 6:15 PM Kenneth Knowles <k...@apache.org>
wrote:
It is worth noting that Beam isn't solely a portability layer that
exposes underlying API features, but a feature-rich layer in its
own right, with carefully coherent abstractions. For example, quite
early on the SparkRunner supported streaming aspects of the Beam
model - watermarks, windowing, triggers - that were not really
available any other way. Beam's various features sometimes requires
just a pass-through API and sometimes requires clever new
implementation. And everything is moving constantly. I don't see
Beam as following the features of any engine, but rather coming up
with new needed data processing abstractions and figuring out how
to efficiently implement them on top of various architectures.

Kenn

On Tue, Apr 30, 2019 at 8:37 AM kant kodali <kanth...@gmail.com>
wrote:
Staying behind doesn't imply one is better than the other and I
didn't mean that in any way but I fail to see how an abstraction
framework like Beam can stay ahead of the underlying execution
engines?

For example, If a new feature is added into the underlying
execution engine that doesn't fit the interface of Beam or breaks then I would think the interface would need to be changed. Another example would say the underlying execution engines take different
kind's of parameters for the same feature then it isn't so
straight forward to come up with an interface since there might be very little in common in the first place so, in that sense, I fail
to see how Beam can stay ahead.

"Of course the API itself is Spark-specific, but it borrows
heavily (among other things) on ideas that Beam itself pioneered
long before Spark 2.0" Good to know.

"one of the things Beam has focused on was a language portability framework"  Sure but how important is this for a typical user? Do
people stop using a particular tool because it is in an X
language? I personally would put features first over language
portability and it's completely fine that may not be in line with
Beam's priorities. All said I can agree Beam focus on language
portability is great.

On Tue, Apr 30, 2019 at 2:48 AM Maximilian Michels
<m...@apache.org> wrote:
I wouldn't say one is, or will always be, in front of or behind
another.
That's a great way to phrase it. I think it is very common to
jump to
the conclusion that one system is better than the other. In
reality it's
often much more complicated.

For example, one of the things Beam has focused on was a language
portability framework. Do I get this with Flink? No. Does that
mean Beam
is better than Flink? No. Maybe a better question would be, do I
want to
be able to run Python pipelines?

This is just an example, there are many more factors to consider.

Cheers,
Max

On 30.04.19 10:59, Robert Bradshaw wrote:
Though we all certainly have our biases, I think it's fair to
say that
all of these systems are constantly innovating, borrowing ideas
from
one another, and have their strengths and weaknesses. I wouldn't
say
one is, or will always be, in front of or behind another.

Take, as the given example Spark Structured Streaming. Of course
the
API itself is spark-specific, but it borrows heavily (among other things) on ideas that Beam itself pioneered long before Spark 2.0,
specifically the unification of batch and streaming processing
into a
single API, and the event-time based windowing (triggering)
model for
consistently and correctly handling distributed, out-of-order data
streams.

Of course there are also operational differences. Spark, for
example,
is very tied to the micro-batch style of execution whereas Flink is fundamentally very continuous, and Beam delegates to the underlying
runner.

It is certainly Beam's goal to keep overhead minimal, and one of
the
primary selling points is the flexibility of portability (of
both the
execution runtime and the SDK) as your needs change.

- Robert


On Tue, Apr 30, 2019 at 5:29 AM <kanth...@gmail.com> wrote:
Ofcourse! I suspect beam will always be one or two step
backwards to the new functionality that is available or yet to
come.

For example: Spark Structured Streaming is still not available,
no CEP apis yet and much more.

Sent from my iPhone

On Apr 30, 2019, at 12:11 AM, Pankaj Chand
<pankajchanda...@gmail.com> wrote:

Will Beam add any overhead or lack certain API/functions
available in Spark/Flink?

Reply via email to