Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-04 Thread kant kodali
I believe this comes down to more of abstractions vs execution engines and
I am sure people can take on both sides. I think both are important however
It is worth noting that the execution framework themselves have a lot of
abstractions but sure more generic ones can be built on top. Are
abstractions always good?! I will just point to this book


I tend to lean more on the execution engines side because I can build
something on top. I am also not sure if Beam is the first one to come up
with these ideas since Frameworks like Cascading had existed long before.

Lets say Beam came up with the abstractions long before other runners but
to map things to runners it is going to take time (that's where things are
today). so its always a moving target.






On Tue, Apr 30, 2019 at 3:15 PM Kenneth Knowles  wrote:

> It is worth noting that Beam isn't solely a portability layer that exposes
> underlying API features, but a feature-rich layer in its own right, with
> carefully coherent abstractions. For example, quite early on the
> SparkRunner supported streaming aspects of the Beam model - watermarks,
> windowing, triggers - that were not really available any other way. Beam's
> various features sometimes requires just a pass-through API and sometimes
> requires clever new implementation. And everything is moving constantly. I
> don't see Beam as following the features of any engine, but rather coming
> up with new needed data processing abstractions and figuring out how to
> efficiently implement them on top of various architectures.
>
> Kenn
>
> On Tue, Apr 30, 2019 at 8:37 AM kant kodali  wrote:
>
>> Staying behind doesn't imply one is better than the other and I didn't
>> mean that in any way but I fail to see how an abstraction framework like
>> Beam can stay ahead of the underlying execution engines?
>>
>> For example, If a new feature is added into the underlying execution
>> engine that doesn't fit the interface of Beam or breaks then I would think
>> the interface would need to be changed. Another example would say the
>> underlying execution engines take different kind's of parameters for the
>> same feature then it isn't so straight forward to come up with an interface
>> since there might be very little in common in the first place so, in that
>> sense, I fail to see how Beam can stay ahead.
>>
>> "Of course the API itself is Spark-specific, but it borrows heavily
>> (among other things) on ideas that Beam itself pioneered long before Spark
>> 2.0" Good to know.
>>
>> "one of the things Beam has focused on was a language portability
>> framework"  Sure but how important is this for a typical user? Do people
>> stop using a particular tool because it is in an X language? I personally
>> would put features first over language portability and it's completely fine
>> that may not be in line with Beam's priorities. All said I can agree Beam
>> focus on language portability is great.
>>
>> On Tue, Apr 30, 2019 at 2:48 AM Maximilian Michels 
>> wrote:
>>
>>> > I wouldn't say one is, or will always be, in front of or behind
>>> another.
>>>
>>> That's a great way to phrase it. I think it is very common to jump to
>>> the conclusion that one system is better than the other. In reality it's
>>> often much more complicated.
>>>
>>> For example, one of the things Beam has focused on was a language
>>> portability framework. Do I get this with Flink? No. Does that mean Beam
>>> is better than Flink? No. Maybe a better question would be, do I want to
>>> be able to run Python pipelines?
>>>
>>> This is just an example, there are many more factors to consider.
>>>
>>> Cheers,
>>> Max
>>>
>>> On 30.04.19 10:59, Robert Bradshaw wrote:
>>> > Though we all certainly have our biases, I think it's fair to say that
>>> > all of these systems are constantly innovating, borrowing ideas from
>>> > one another, and have their strengths and weaknesses. I wouldn't say
>>> > one is, or will always be, in front of or behind another.
>>> >
>>> > Take, as the given example Spark Structured Streaming. Of course the
>>> > API itself is spark-specific, but it borrows heavily (among other
>>> > things) on ideas that Beam itself pioneered long before Spark 2.0,
>>> > specifically the unification of batch and streaming processing into a
>>> > single API, and the event-time based windowing (triggering) model for
>>> > consistently and correctly handling distributed, out-of-order data
>>> > streams.
>>> >
>>> > Of course there are also operational differences. Spark, for example,
>>> > is very tied to the micro-batch style of execution whereas Flink is
>>> > fundamentally very continuous, and Beam delegates to the underlying
>>> > runner.
>>> >
>>> > It is certainly Beam's goal to keep overhead minimal, and one of the
>>> > primary selling points is the flexibility of portability (of both the
>>> > 

Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-04 Thread Pankaj Chand
Hi Matt,

My project is for my PhD. So, I am interested in those 0.1% of use cases.

--Pankaj

On Sat, May 4, 2019, 10:48 AM Matt Casters  wrote:

> Anything can be coded in any form or language on any platform.
> However, doing so takes time and effort.  Maintaining the code takes time
> as well as protecting the investments you made from changes in the
> ecosystem.
> This is obviously where APIs like Beam come into play quite heavily.  New
> technology seems to come around like fads these days and that innovation is
> obviously not a bad thing.  We would still be using Map/Reduce if it was.
> But for people trying to build solutions changing platforms is a painful
> process incurring massive costs.
> So with that in mind I would bounce this question back: why on Earth would
> you *want* to write for a specific platform?  Are you *really* interested
> in those 0.1% use cases and is it really helping your business move
> forward?  It's possible but if not, I would strongly advice against it.
>
> Just my 2 cents.
>
> Cheers,
> Matt
> ---
> Matt Casters attcast...@gmail.com>
> Senior Solution Architect, Kettle Project Founder
>
>
>
>
> Op vr 3 mei 2019 om 22:42 schreef Jan Lukavský :
>
>> Hi,
>>
>> On 5/3/19 12:20 PM, Maximilian Michels wrote:
>> > Hi Jan,
>> >
>> >> Typical example could be a machine learning task, where you might
>> >> have a lot of data cleansing and simple transformations, followed by
>> >> some ML algorithm (e.g. SVD). One might want to use Spark MLlib for
>> >> the ML task, but Beam for all the transformations around. Then,
>> >> porting to different runner would mean only provide different
>> >> implementation of the SVD, but everything else would remaining the
>> same.
>> >
>> > This is a fair point. Of course you could always split up the pipeline
>> > into two jobs, e.g. have a native Spark job and a Beam job running on
>> > Spark.
>> >
>> > Something that came to my mind is "unsafe" in Rust which allows you to
>> > leave the safe abstractions of Rust and use raw C code. If Beam had
>> > something like that which really emphasized the non-portable aspect of
>> > a transform, that could change things:
>> >
>> >   Pipeline p = ..
>> >   p.getOptions().setAllowNonPortable(true);
>> >   p.apply(
>> >   NonPortable.of(new MyFlinkOperator(), FlinkRunner.class));
>> >
>> > Again, I'm not sure we want to go down that road, but if there are
>> > really specific use cases, we could think about it.
>> Yes, this is exactly what I meant. I think that this doesn't threat any
>> of Beam's selling points, because this way, you declare you *want* your
>> pipeline being non-portable, so if you don't do it on purpose, your
>> pipeline will still be portable. The key point here is that the
>> underlying systems are likely to evolve quicker than Beam (in some
>> directions or some ways - Beam might on the other hand bring features to
>> these systems, that's for sure). Examples might be Spark's MLlib or
>> Flink's iterative streams.
>> >
>> >> Generally, there are optimizations that could be really dependent on
>> >> the pipeline. Only then you might have enough information that can
>> >> result in some very specific optimization.
>> >
>> > If these pattern can be detected in DAGs, then we can built
>> > optimizations into the FlinkRunner. If that is not feasible, then
>> > you're out luck. Could you describe an optimization that you miss in
>> > Beam?
>>
>> I think that sometimes you cannot infer all possible optimizations from
>> the DAG itself. If you read from a source (e.g. Kafka), information
>> about how do you partition data when writing to Kafka might help you
>> avoid additional shuffling in some cases. That's probably something you
>> could be in theory able to do via some annotations of sources, but the
>> fundamental question here is - do you really want to do that? Or just
>> let the user perform some hard coding when he knows that it might help
>> in his particular case (possible even corner case)?
>>
>> Jan
>>
>> >
>> > Cheers,
>> > Max
>> >
>> > On 02.05.19 22:44, Jan Lukavský wrote:
>> >> Hi Max,
>> >>
>> >> comments inline.
>> >>
>> >> On 5/2/19 3:29 PM, Maximilian Michels wrote:
>> >>> Couple of comments:
>> >>>
>> >>> * Flink transforms
>> >>>
>> >>> It wouldn't be hard to add a way to run arbitrary Flink operators
>> >>> through the Beam API. Like you said, once you go down that road, you
>> >>> loose the ability to run the pipeline on a different Runner. And
>> >>> that's precisely one of the selling points of Beam. I'm afraid once
>> >>> you even allow 1% non-portable pipelines, you have lost it all.
>> >> Absolutely true, but - the question here is "how much effort do I
>> >> have to invest in order to port pipeline to different runner?". If
>> >> this effort is low, I'd say the pipeline remains "nearly portable".
>> >> Typical example could be a machine learning task, where you might
>> >> have a lot of data cleansing and simple transformations, followed by
>> >> some ML 

Re: python3-avro with CombineGlobally(CombineFn)

2019-05-04 Thread Chengxuan Wang
Yes. The only thing I can’t control is this line:
https://github.com/apache/beam/pull/8130/files#diff-04fef9e0550df0b0c4e1cd0264406eb5L608

On Sat, May 4, 2019 at 04:46 Valentyn Tymofieiev 
wrote:

> HI Chengxuan,
>
> We will try to include this change in the next release. As I said, you
> could also set use_fastavro=true in your pipeline code without having to
> wait for the change the makes this flag set to true by default.
>
> Thanks, Valentyn
>
>
> *From:*Chengxuan Wang 
> *Date:*Sat, May 4, 2019, 3:28 AM
> *To:* 
>
> Hi Valentyn,
>>
>> Thanks a lot. By following https://github.com/apache/beam/pull/8130 , I
>> made changes in my apache beam package locally, now my test can pass. This
>> line
>> https://github.com/apache/beam/pull/8130/files#diff-04fef9e0550df0b0c4e1cd0264406eb5L608
>>  is
>> important.
>>
>> Is there a way to accelerate the release of this change? The reason I ask
>> is couple of my team's customers only use python3. :( So we may lose
>> customers without this change.
>>
>> please let me know.
>>
>> Thanks,
>> Chengxuan
>>
>> Valentyn Tymofieiev  于2019年5月3日周五 下午7:45写道:
>>
>>> Correction: we are making fastavro a default option on Python 3 only for
>>> now. You can follow https://github.com/apache/beam/pull/8130 for
>>> updates on that.
>>>
>>> *From:*Valentyn Tymofieiev 
>>> *Date:*Fri, May 3, 2019, 10:41 PM
>>> *To:* 
>>>
>>> Hi,

 Unfortunately, Avro library currently doesn't work well with Python  3.
 Could you try using fastavro in your pipeline and report back if that
 helped to resolve your issue? We are also making fastavro a default option,
 likely starting from 2.13.0. You could use fastavro as follows

 (sent from Phone, apologies for possible errors):

 WriteToAvro(
   ...,
   use_fastavro=True
   )




 *From:*Chengxuan Wang 
 *Date:*Fri, May 3, 2019, 8:30 PM
 *To:* 

 Hi,
> I am trying to create a PTransform to combine avro schemas. But I met
> `json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)`,
> I think It is related to
> https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L1058 .
> Because avro didn't implement __ne__, it will use __eq__ delegated.
>
> Then I saw there is `without_default` method in CombineGlobally, so I
> add that. Now I get `xception: Attempting to map key 'b' to value
>  in ImmutableDict {} [while
> running 'WriteToAvro/_WriteToAvroWithNoSchema/WriteBundles']`.
>
> Does anyone have this problem before? I am using python3 with
> apache-beam 2.12.0(the latest one)?
>
> It works fine under python2, but can't work under python3.
>
> Thanks,
> Chengxuan
>



Is it safe to cache the value of a singleton view (with a global window) in a DoFn?

2019-05-04 Thread Steve Niemitz
I have a singleton view in a global window that is read from a DoFn.  I'm
curious if its "correct" to cache that value from the view, or if I need to
read it every time.

As a (simplified) example, if I were to generate the view as such:

input.getPipeline
  .apply(Create.of(Collections.singleton[Void](null)))
  .apply(MapElements.via(new SimpleFunction[Void, JLong]() {
override def apply(input: Void): JLong = {
  Instant.now().getMillis
}
  })).apply(View.asSingleton[JLong]())

and then read it from a DoFn (using context.sideInput), is it guaranteed
that:
- every instance of the DoFn will read the same value?
- The value will never change?

If so it seems like it'd be safe to cache the value inside the DoFn.  It
seems like this would be the case, but I've also seen cases in dataflow
where the UI indicates that the MapElements step above produced more than
one element, so I'm curious what people have to say.

Thanks!


Re: Will Beam add any overhead or lack certain API/functions available in Spark/Flink?

2019-05-04 Thread Matt Casters
Anything can be coded in any form or language on any platform.
However, doing so takes time and effort.  Maintaining the code takes time
as well as protecting the investments you made from changes in the
ecosystem.
This is obviously where APIs like Beam come into play quite heavily.  New
technology seems to come around like fads these days and that innovation is
obviously not a bad thing.  We would still be using Map/Reduce if it was.
But for people trying to build solutions changing platforms is a painful
process incurring massive costs.
So with that in mind I would bounce this question back: why on Earth would
you *want* to write for a specific platform?  Are you *really* interested
in those 0.1% use cases and is it really helping your business move
forward?  It's possible but if not, I would strongly advice against it.

Just my 2 cents.

Cheers,
Matt
---
Matt Casters attcast...@gmail.com>
Senior Solution Architect, Kettle Project Founder




Op vr 3 mei 2019 om 22:42 schreef Jan Lukavský :

> Hi,
>
> On 5/3/19 12:20 PM, Maximilian Michels wrote:
> > Hi Jan,
> >
> >> Typical example could be a machine learning task, where you might
> >> have a lot of data cleansing and simple transformations, followed by
> >> some ML algorithm (e.g. SVD). One might want to use Spark MLlib for
> >> the ML task, but Beam for all the transformations around. Then,
> >> porting to different runner would mean only provide different
> >> implementation of the SVD, but everything else would remaining the
> same.
> >
> > This is a fair point. Of course you could always split up the pipeline
> > into two jobs, e.g. have a native Spark job and a Beam job running on
> > Spark.
> >
> > Something that came to my mind is "unsafe" in Rust which allows you to
> > leave the safe abstractions of Rust and use raw C code. If Beam had
> > something like that which really emphasized the non-portable aspect of
> > a transform, that could change things:
> >
> >   Pipeline p = ..
> >   p.getOptions().setAllowNonPortable(true);
> >   p.apply(
> >   NonPortable.of(new MyFlinkOperator(), FlinkRunner.class));
> >
> > Again, I'm not sure we want to go down that road, but if there are
> > really specific use cases, we could think about it.
> Yes, this is exactly what I meant. I think that this doesn't threat any
> of Beam's selling points, because this way, you declare you *want* your
> pipeline being non-portable, so if you don't do it on purpose, your
> pipeline will still be portable. The key point here is that the
> underlying systems are likely to evolve quicker than Beam (in some
> directions or some ways - Beam might on the other hand bring features to
> these systems, that's for sure). Examples might be Spark's MLlib or
> Flink's iterative streams.
> >
> >> Generally, there are optimizations that could be really dependent on
> >> the pipeline. Only then you might have enough information that can
> >> result in some very specific optimization.
> >
> > If these pattern can be detected in DAGs, then we can built
> > optimizations into the FlinkRunner. If that is not feasible, then
> > you're out luck. Could you describe an optimization that you miss in
> > Beam?
>
> I think that sometimes you cannot infer all possible optimizations from
> the DAG itself. If you read from a source (e.g. Kafka), information
> about how do you partition data when writing to Kafka might help you
> avoid additional shuffling in some cases. That's probably something you
> could be in theory able to do via some annotations of sources, but the
> fundamental question here is - do you really want to do that? Or just
> let the user perform some hard coding when he knows that it might help
> in his particular case (possible even corner case)?
>
> Jan
>
> >
> > Cheers,
> > Max
> >
> > On 02.05.19 22:44, Jan Lukavský wrote:
> >> Hi Max,
> >>
> >> comments inline.
> >>
> >> On 5/2/19 3:29 PM, Maximilian Michels wrote:
> >>> Couple of comments:
> >>>
> >>> * Flink transforms
> >>>
> >>> It wouldn't be hard to add a way to run arbitrary Flink operators
> >>> through the Beam API. Like you said, once you go down that road, you
> >>> loose the ability to run the pipeline on a different Runner. And
> >>> that's precisely one of the selling points of Beam. I'm afraid once
> >>> you even allow 1% non-portable pipelines, you have lost it all.
> >> Absolutely true, but - the question here is "how much effort do I
> >> have to invest in order to port pipeline to different runner?". If
> >> this effort is low, I'd say the pipeline remains "nearly portable".
> >> Typical example could be a machine learning task, where you might
> >> have a lot of data cleansing and simple transformations, followed by
> >> some ML algorithm (e.g. SVD). One might want to use Spark MLlib for
> >> the ML task, but Beam for all the transformations around. Then,
> >> porting to different runner would mean only provide different
> >> implementation of the SVD, but everything else would remaining the same.
> 

Re: python3-avro with CombineGlobally(CombineFn)

2019-05-04 Thread Valentyn Tymofieiev
HI Chengxuan,

We will try to include this change in the next release. As I said, you
could also set use_fastavro=true in your pipeline code without having to
wait for the change the makes this flag set to true by default.

Thanks, Valentyn

*From:*Chengxuan Wang 
*Date:*Sat, May 4, 2019, 3:28 AM
*To:* 

Hi Valentyn,
>
> Thanks a lot. By following https://github.com/apache/beam/pull/8130 , I
> made changes in my apache beam package locally, now my test can pass. This
> line
> https://github.com/apache/beam/pull/8130/files#diff-04fef9e0550df0b0c4e1cd0264406eb5L608
>  is
> important.
>
> Is there a way to accelerate the release of this change? The reason I ask
> is couple of my team's customers only use python3. :( So we may lose
> customers without this change.
>
> please let me know.
>
> Thanks,
> Chengxuan
>
> Valentyn Tymofieiev  于2019年5月3日周五 下午7:45写道:
>
>> Correction: we are making fastavro a default option on Python 3 only for
>> now. You can follow https://github.com/apache/beam/pull/8130 for updates
>> on that.
>>
>> *From:*Valentyn Tymofieiev 
>> *Date:*Fri, May 3, 2019, 10:41 PM
>> *To:* 
>>
>> Hi,
>>>
>>> Unfortunately, Avro library currently doesn't work well with Python  3.
>>> Could you try using fastavro in your pipeline and report back if that
>>> helped to resolve your issue? We are also making fastavro a default option,
>>> likely starting from 2.13.0. You could use fastavro as follows
>>>
>>> (sent from Phone, apologies for possible errors):
>>>
>>> WriteToAvro(
>>>   ...,
>>>   use_fastavro=True
>>>   )
>>>
>>>
>>>
>>>
>>> *From:*Chengxuan Wang 
>>> *Date:*Fri, May 3, 2019, 8:30 PM
>>> *To:* 
>>>
>>> Hi,
 I am trying to create a PTransform to combine avro schemas. But I met
 `json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)`,
 I think It is related to
 https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L1058 .
 Because avro didn't implement __ne__, it will use __eq__ delegated.

 Then I saw there is `without_default` method in CombineGlobally, so I
 add that. Now I get `xception: Attempting to map key 'b' to value
  in ImmutableDict {} [while
 running 'WriteToAvro/_WriteToAvroWithNoSchema/WriteBundles']`.

 Does anyone have this problem before? I am using python3 with
 apache-beam 2.12.0(the latest one)?

 It works fine under python2, but can't work under python3.

 Thanks,
 Chengxuan

>>>


Re: python3-avro with CombineGlobally(CombineFn)

2019-05-04 Thread Chengxuan Wang
Hi Valentyn,

Thanks a lot. By following https://github.com/apache/beam/pull/8130 , I
made changes in my apache beam package locally, now my test can pass. This
line
https://github.com/apache/beam/pull/8130/files#diff-04fef9e0550df0b0c4e1cd0264406eb5L608
is
important.

Is there a way to accelerate the release of this change? The reason I ask
is couple of my team's customers only use python3. :( So we may lose
customers without this change.

please let me know.

Thanks,
Chengxuan

Valentyn Tymofieiev  于2019年5月3日周五 下午7:45写道:

> Correction: we are making fastavro a default option on Python 3 only for
> now. You can follow https://github.com/apache/beam/pull/8130 for updates
> on that.
>
> *From:*Valentyn Tymofieiev 
> *Date:*Fri, May 3, 2019, 10:41 PM
> *To:* 
>
> Hi,
>>
>> Unfortunately, Avro library currently doesn't work well with Python  3.
>> Could you try using fastavro in your pipeline and report back if that
>> helped to resolve your issue? We are also making fastavro a default option,
>> likely starting from 2.13.0. You could use fastavro as follows
>>
>> (sent from Phone, apologies for possible errors):
>>
>> WriteToAvro(
>>   ...,
>>   use_fastavro=True
>>   )
>>
>>
>>
>>
>> *From:*Chengxuan Wang 
>> *Date:*Fri, May 3, 2019, 8:30 PM
>> *To:* 
>>
>> Hi,
>>> I am trying to create a PTransform to combine avro schemas. But I met
>>> `json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)`,
>>> I think It is related to
>>> https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L1058 .
>>> Because avro didn't implement __ne__, it will use __eq__ delegated.
>>>
>>> Then I saw there is `without_default` method in CombineGlobally, so I
>>> add that. Now I get `xception: Attempting to map key 'b' to value
>>>  in ImmutableDict {} [while
>>> running 'WriteToAvro/_WriteToAvroWithNoSchema/WriteBundles']`.
>>>
>>> Does anyone have this problem before? I am using python3 with
>>> apache-beam 2.12.0(the latest one)?
>>>
>>> It works fine under python2, but can't work under python3.
>>>
>>> Thanks,
>>> Chengxuan
>>>
>>