Jenkins build is back to stable : beam_Release_NightlySnapshot #332

2017-02-17 Thread Apache Jenkins Server
See 



Performance Testing Next Steps

2017-02-17 Thread Jason Kuster
Hi all,

I've written up a doc on next steps for getting performance testing up and
running for Beam. I'd love to hear from people -- there's a fair amount of
work encapsulated in here, but the end result is that we have a performance
testing system which we can use for benchmarking all aspects of Beam, which
would be really exciting. Looking forward to your thoughts.

https://docs.google.com/document/d/1PsjGPSN6FuorEEPrKEP3u3m16tyOzph5FnL2DhaRDz0/edit?ts=58a78e73

Best,

Jason

-- 
---
Jason Kuster
Apache Beam / Google Cloud Dataflow


Re: Metrics for Beam IOs.

2017-02-17 Thread Amit Sela
@JB I think what you're suggesting is that Beam should provide a "Metrics
Reporting" API as well, and I used to think like you, but the more I
thought of that the more I tend to disagree now.

The SDK is for users to author pipelines, so Metrics are for user-defined
metrics (in contrast to runner metrics).

The Runner API is supposed to help different backends to integrate with
Beam to allow users to execute those pipeline on their favourite backend. I
believe the Runner API has to provide restrictions/demands that are just
enough so a runner could execute a Beam pipeline as best it can, and I'm
afraid that this would demand runner authors to do work that is unnecessary.
This is also sort of "crossing the line" into the runner's domain and
"telling it how to do" instead of what, and I don't think we want that.

I do believe however that runner's should integrate the Metrics into their
own metrics reporting system - but that's for the runner author to decide.
Stas did this for the Spark runner because Spark doesn't report back
user-defined Accumulators (Spark's Aggregators) to it's Metrics system.

On a curious note though, did you use an OSGi service per event-type ? so
you can upgrade specific event-handlers without taking down the entire
reporter ? but that's really unrelated to this thread :-) .



On Fri, Feb 17, 2017 at 8:36 PM Ben Chambers 
wrote:

> It don't think it is possible for there to be a general mechanism for
> pushing metrics out during the execution of a pipeline. The Metrics API
> suggests that metrics should be reported as values across all attempts and
> values across only successful attempts. The latter requires runner
> involvement to ensure that a given metric value is atomically incremented
> (or checkpointed) when the bundle it was reported in is committed.
>
> Aviem has already implemented Metrics support for the Spark runner. I am
> working on support for the Dataflow runner.
>
> On Fri, Feb 17, 2017 at 7:50 AM Jean-Baptiste Onofré 
> wrote:
>
> Hi guys,
>
> As I'm back from vacation, I'm back on this topic ;)
>
> It's a great discussion, and I think about the Metric IO coverage, it's
> good.
>
> However, there's a point that we discussed very fast in the thread and I
> think it's an important one (maybe more important than the provided
> metrics actually in term of roadmap ;)).
>
> Assuming we have pipelines, PTransforms, IOs, ... using the Metric API,
> how do we expose the metrics for the end-users ?
>
> A first approach would be to bind a JMX MBean server by the pipeline and
> expose the metrics via MBeans. I don't think it's a good idea for the
> following reasons:
> 1. It's not easy to know where the pipeline is actually executed, and
> so, not easy to find the MBean server URI.
> 2. For the same reason, we can have port binding error.
> 3. If it could work for unbounded/streaming pipelines (as they are
> always "running"), it's not really applicable for bounded/batch
> pipelines as their lifetime is "limited" ;)
>
> So, I think the "push" approach is better: during the execution, a
> pipeline "internally" collects and pushes the metric to a backend.
> The "push" could a kind of sink. For instance, the metric "records" can
> be sent to a Kafka topic, or directly to Elasticsearch or whatever.
> The metric backend can deal with alerting, reporting, etc.
>
> Basically, we have to define two things:
> 1. The "appender" where the metrics have to be sent (and the
> corresponding configuration to connect, like Kafka or Elasticsearch
> location)
> 2. The format of the metric data (for instance, json format).
>
> In Apache Karaf, I created something similar named Decanter:
>
>
> http://blog.nanthrax.net/2015/07/monitoring-and-alerting-with-apache-karaf-decanter/
>
> http://karaf.apache.org/manual/decanter/latest-1/
>
> Decanter provides collectors that harvest the metrics (like JMX MBean
> attributes, log messages, ...). Basically, for Beam, it would be
> directly the Metric API used by pipeline parts.
> Then, the metric record are send to a dispatcher which send the metric
> records to an appender. The appenders store or send the metric records
> to a backend (elasticsearc, cassandra, kafka, jms, reddis, ...).
>
> I think it would make sense to provide the configuration and Metric
> "appender" via the pipeline options.
> As it's not really runner specific, it could be part of the metric API
> (or SPI in that case).
>
> WDYT ?
>
> Regards
> JB
>
> On 02/15/2017 09:22 AM, Stas Levin wrote:
> > +1 to making the IO metrics (e.g. producers, consumers) available as part
> > of the Beam pipeline metrics tree for debugging and visibility.
> >
> > As it has already been mentioned, many IO clients have a metrics
> mechanism
> > in place, so in these cases I think it could be beneficial to mirror
> their
> > metrics under the relevant subtree of the Beam metrics tree.
> >
> > On Wed, Feb 15, 2017 at 12:04 AM Amit Sela  wrote:
> >
> >> I think this is a great discussion and I'd like to relate

Re: Metrics for Beam IOs.

2017-02-17 Thread Ben Chambers
It don't think it is possible for there to be a general mechanism for
pushing metrics out during the execution of a pipeline. The Metrics API
suggests that metrics should be reported as values across all attempts and
values across only successful attempts. The latter requires runner
involvement to ensure that a given metric value is atomically incremented
(or checkpointed) when the bundle it was reported in is committed.

Aviem has already implemented Metrics support for the Spark runner. I am
working on support for the Dataflow runner.

On Fri, Feb 17, 2017 at 7:50 AM Jean-Baptiste Onofré 
wrote:

Hi guys,

As I'm back from vacation, I'm back on this topic ;)

It's a great discussion, and I think about the Metric IO coverage, it's
good.

However, there's a point that we discussed very fast in the thread and I
think it's an important one (maybe more important than the provided
metrics actually in term of roadmap ;)).

Assuming we have pipelines, PTransforms, IOs, ... using the Metric API,
how do we expose the metrics for the end-users ?

A first approach would be to bind a JMX MBean server by the pipeline and
expose the metrics via MBeans. I don't think it's a good idea for the
following reasons:
1. It's not easy to know where the pipeline is actually executed, and
so, not easy to find the MBean server URI.
2. For the same reason, we can have port binding error.
3. If it could work for unbounded/streaming pipelines (as they are
always "running"), it's not really applicable for bounded/batch
pipelines as their lifetime is "limited" ;)

So, I think the "push" approach is better: during the execution, a
pipeline "internally" collects and pushes the metric to a backend.
The "push" could a kind of sink. For instance, the metric "records" can
be sent to a Kafka topic, or directly to Elasticsearch or whatever.
The metric backend can deal with alerting, reporting, etc.

Basically, we have to define two things:
1. The "appender" where the metrics have to be sent (and the
corresponding configuration to connect, like Kafka or Elasticsearch
location)
2. The format of the metric data (for instance, json format).

In Apache Karaf, I created something similar named Decanter:

http://blog.nanthrax.net/2015/07/monitoring-and-alerting-with-apache-karaf-decanter/

http://karaf.apache.org/manual/decanter/latest-1/

Decanter provides collectors that harvest the metrics (like JMX MBean
attributes, log messages, ...). Basically, for Beam, it would be
directly the Metric API used by pipeline parts.
Then, the metric record are send to a dispatcher which send the metric
records to an appender. The appenders store or send the metric records
to a backend (elasticsearc, cassandra, kafka, jms, reddis, ...).

I think it would make sense to provide the configuration and Metric
"appender" via the pipeline options.
As it's not really runner specific, it could be part of the metric API
(or SPI in that case).

WDYT ?

Regards
JB

On 02/15/2017 09:22 AM, Stas Levin wrote:
> +1 to making the IO metrics (e.g. producers, consumers) available as part
> of the Beam pipeline metrics tree for debugging and visibility.
>
> As it has already been mentioned, many IO clients have a metrics mechanism
> in place, so in these cases I think it could be beneficial to mirror their
> metrics under the relevant subtree of the Beam metrics tree.
>
> On Wed, Feb 15, 2017 at 12:04 AM Amit Sela  wrote:
>
>> I think this is a great discussion and I'd like to relate to some of the
>> points raised here, and raise some of my own.
>>
>> First of all I think we should be careful here not to cross boundaries.
IOs
>> naturally have many metrics, and Beam should avoid "taking over" those.
IO
>> metrics should focus on what's relevant to the Pipeline: input/output
rate,
>> backlog (for UnboundedSources, which exists in bytes but for monitoring
>> purposes we might want to consider #messages).
>>
>> I don't agree that we should not invest in doing this in Sources/Sinks
and
>> going directly to SplittableDoFn because the IO API is familiar and
known,
>> and as long as we keep it should be treated as a first class citizen.
>>
>> As for enable/disable - if IOs consider focusing on pipeline-related
>> metrics I think we should be fine, though this could also change between
>> runners as well.
>>
>> Finally, considering "split-metrics" is interesting because on one hand
it
>> affects the pipeline directly (unbalanced partitions in Kafka that may
>> cause backlog) but this is that fine-line of responsibilities (Kafka
>> monitoring would probably be able to tell you that partitions are not
>> balanced).
>>
>> My 2 cents, cheers!
>>
>> On Tue, Feb 14, 2017 at 8:46 PM Raghu Angadi 
>> wrote:
>>
>>> On Tue, Feb 14, 2017 at 9:21 AM, Ben Chambers
>> >>>
>>> wrote:
>>>

> * I also think there are data source specific metrics that a given IO
 will
> want to expose (ie, things like kafka backlog for a topic.)
>>>
>>>
>>> UnboundedSource has API for backlog. It is better for

Re: Jenkins build is unstable: beam_Release_NightlySnapshot #331

2017-02-17 Thread Kenneth Knowles
Looking for ~5 seconds, failure looks like infrastructure related to
ElasticSearchIO. I couldn't find a known issue or source of flakiness for
ElasticSearchIO in JIRA, and I didn't have time to investigate whether it
is a one-time issue or something actionable. Thoughts?

On Thu, Feb 16, 2017 at 11:44 PM, Apache Jenkins Server <
jenk...@builds.apache.org> wrote:

> See  NightlySnapshot/331/changes>
>
>


RE: Merge HadoopInputFormatIO and HDFSIO in a single module

2017-02-17 Thread Dipti Kulkarni
Thank you  all for your inputs! 


-Original Message-
From: Dan Halperin [mailto:dhalp...@google.com.INVALID] 
Sent: Friday, February 17, 2017 12:17 PM
To: dev@beam.apache.org
Subject: Re: Merge HadoopInputFormatIO and HDFSIO in a single module

Raghu, Amit -- +1 to your expertise :)

On Thu, Feb 16, 2017 at 3:39 PM, Amit Sela  wrote:

> I agree with Dan on everything regarding HdfsFileSystem - it's super 
> convenient for users to use TextIO with HdfsFileSystem rather then 
> replacing the IO and also specifying the InputFormat type.
>
> I disagree on "HadoopIO" - I think that people who work with Hadoop 
> would find this name intuitive, and that's whats important.
> Even more, and joining Raghu's comment, it is also recognized as 
> "compatible with Hadoop", so for example someone running a Beam 
> pipeline using the Spark runner on Amazon's S3 and wants to read/write 
> Hadoop sequence files would simply use HadoopIO and provide the 
> appropriate runtime dependencies (actually true for GS as well).
>
> On Thu, Feb 16, 2017 at 9:08 PM Raghu Angadi 
> 
> wrote:
>
> > FileInputFormat is extremely widely used, pretty much all the file 
> > based input formats extend it. All of them call into to list the 
> > input files, split (with some tweaks on top of that). The special 
> > API ( *FileInputFormat.setMinInputSplitSize(job,
> > desiredBundleSizeBytes)* ) is how the split size is normally
> communicated.
> > New IO can use the api directly.
> >
> > HdfsIO as implemented in Beam is not HDFS specific at all. There are 
> > no hdfs imports and HDFS name does not appear anywhere other than in
> HdfsIO's
> > own class and method names. AvroHdfsFileSource etc would work just 
> > as
> well
> > with new IO.
> >
> > On Thu, Feb 16, 2017 at 8:17 AM, Dan Halperin
>  > >
> > wrote:
> >
> > > (And I think renaming to HadoopIO doesn't make sense. 
> > > "InputFormat" is
> > the
> > > key component of the name -- it reads things that implement the
> > InputFormat
> > > interface. "Hadoop" means a lot more than that.)
> > >
> >
> > Often 'IO' in Beam implies both sources and sinks. It might not be 
> > long before we might be supporting Hadoop OutputFormat as well. In 
> > addition HadoopInputFormatIO is quite a mouthful. Agreed, Hadoop can 
> > mean a lot of things depending on the context. In 'IO' context it 
> > might not be too
> broad.
> > Normally it implies 'any FileSystem supported in Hadoop, e.g. S3'.
> >
> > Either way, I am quite confident once HadoopInputFormatIO is 
> > written, it can easily replace HdfsIO. That decision could be made later.
> >
> > Raghu.
> >
>

DISCLAIMER
==
This e-mail may contain privileged and confidential information which is the 
property of Persistent Systems Ltd. It is intended only for the use of the 
individual or entity to which it is addressed. If you are not the intended 
recipient, you are not authorized to read, retain, copy, print, distribute or 
use this message. If you have received this communication in error, please 
notify the sender and delete all copies of this message. Persistent Systems 
Ltd. does not accept any liability for virus infected mails.



Re: Pipeline Surgery and an interception-free future

2017-02-17 Thread Thomas Groh
That depends on how you determine which view you're trying to use. All of
the nodes that are visited in Pipeline#traverseTopologically are wired up
in such a way that their outputs correspond one-to-one with the pipeline
independent user graph. However, that means that the contents of any
override don't necessarily match up with the graph that they're present in
- that usually means that outputs have to be determined from the graph
nodes, and shouldn't be obtained from the view itself.

I think you can swap from `PCollectionView.getView()` to
`context.getOutput()` in FlinkStreamingTransformTranslators and everything
should work out. PR #2035 is what I think should be sufficient (pending
test runs, of course)

On Fri, Feb 17, 2017 at 3:29 AM, Aljoscha Krettek 
wrote:

> Thomas, how were you planning to get the StreamingViewAs* PTransforms out
> of the apply() in the Dataflow runner?
>
> I'm asking because the Flink Runner works basically the same way and I've
> run into a problem. My problem is that the TupleTag of a PCollectionView is
> generated when the user applies, for example, the View.AsIterable
> PTransform. When the override is in the runner apply() this is not a
> problem because the tag of the PCollectionView that the user has matches
> the tag that we internally have in the Pipeline. Now, if I later change the
> View.AsIterable using Pipeline surgery a new PCollectionView will be
> created that will have a different internal tag from the one that the user
> PCollectionView has. Then , we the user creates a ParDo with side inputs I
> cannot properly match the replaced View against the user View and I can't
> properly stitch together an execution graph.
>
> On Fri, 17 Feb 2017 at 00:45 Amit Sela  wrote:
>
> > Awesome!
> > First thing I'm gonna do:
> >
> >1. traverse the pipeline to determine if streaming.
> >2. If streaming, replace Read.Bounded with an adapted Read.Unbounded.
> >
> > Current implementation forces translating bounded reads by the unbounded
> > translator and it feels awkward, this makes it right again.
> >
> > Thanks Thomas!
> >
> > On Thu, Feb 16, 2017 at 4:12 PM Aljoscha Krettek 
> > wrote:
> >
> > > I might just try and do that. ;-)
> > >
> > > On Thu, 16 Feb 2017 at 03:55 Thomas Groh 
> > wrote:
> > >
> > > > As of Github PR #1998 (https://github.com/apache/beam/pull/1998),
> the
> > > new
> > > > Pipeline Surgery API is ready and available. There are a couple of
> > > > refinements coming in PR #2006, but in general pipelines can now,
> post
> > > > construction, have PTransforms swapped out to whatever the runner
> > desires
> > > > (standard "behavior-maintaining" caveats apply).
> > > >
> > > > Moving forwards, this will enable pipelines to be run on multiple
> > runners
> > > > without having to reconstruct the graph via repeated applications of
> > > > PTransforms to the pipeline (this also includes being able to, for
> > > example,
> > > > read a pipeline from a serialized representation, and executing the
> > > result
> > > > on an arbitrary runner).
> > > >
> > > > Those of you who are runner authors (at least, those who I can easily
> > > > identify as such) should expect a Pull Request from me sometime next
> > week
> > > > porting you off of intercepting calls to apply and to the new surgery
> > > API.
> > > > You are, of course, welcome to beat me to the punch.
> > > >
> > > > Thanks,
> > > >
> > > > Thomas
> > > >
> > >
> >
>


Re: Metrics for Beam IOs.

2017-02-17 Thread Jean-Baptiste Onofré

Hi guys,

As I'm back from vacation, I'm back on this topic ;)

It's a great discussion, and I think about the Metric IO coverage, it's 
good.


However, there's a point that we discussed very fast in the thread and I 
think it's an important one (maybe more important than the provided 
metrics actually in term of roadmap ;)).


Assuming we have pipelines, PTransforms, IOs, ... using the Metric API, 
how do we expose the metrics for the end-users ?


A first approach would be to bind a JMX MBean server by the pipeline and 
expose the metrics via MBeans. I don't think it's a good idea for the 
following reasons:
1. It's not easy to know where the pipeline is actually executed, and 
so, not easy to find the MBean server URI.

2. For the same reason, we can have port binding error.
3. If it could work for unbounded/streaming pipelines (as they are 
always "running"), it's not really applicable for bounded/batch 
pipelines as their lifetime is "limited" ;)


So, I think the "push" approach is better: during the execution, a 
pipeline "internally" collects and pushes the metric to a backend.
The "push" could a kind of sink. For instance, the metric "records" can 
be sent to a Kafka topic, or directly to Elasticsearch or whatever.

The metric backend can deal with alerting, reporting, etc.

Basically, we have to define two things:
1. The "appender" where the metrics have to be sent (and the 
corresponding configuration to connect, like Kafka or Elasticsearch 
location)

2. The format of the metric data (for instance, json format).

In Apache Karaf, I created something similar named Decanter:

http://blog.nanthrax.net/2015/07/monitoring-and-alerting-with-apache-karaf-decanter/

http://karaf.apache.org/manual/decanter/latest-1/

Decanter provides collectors that harvest the metrics (like JMX MBean 
attributes, log messages, ...). Basically, for Beam, it would be 
directly the Metric API used by pipeline parts.
Then, the metric record are send to a dispatcher which send the metric 
records to an appender. The appenders store or send the metric records 
to a backend (elasticsearc, cassandra, kafka, jms, reddis, ...).


I think it would make sense to provide the configuration and Metric 
"appender" via the pipeline options.
As it's not really runner specific, it could be part of the metric API 
(or SPI in that case).


WDYT ?

Regards
JB

On 02/15/2017 09:22 AM, Stas Levin wrote:

+1 to making the IO metrics (e.g. producers, consumers) available as part
of the Beam pipeline metrics tree for debugging and visibility.

As it has already been mentioned, many IO clients have a metrics mechanism
in place, so in these cases I think it could be beneficial to mirror their
metrics under the relevant subtree of the Beam metrics tree.

On Wed, Feb 15, 2017 at 12:04 AM Amit Sela  wrote:


I think this is a great discussion and I'd like to relate to some of the
points raised here, and raise some of my own.

First of all I think we should be careful here not to cross boundaries. IOs
naturally have many metrics, and Beam should avoid "taking over" those. IO
metrics should focus on what's relevant to the Pipeline: input/output rate,
backlog (for UnboundedSources, which exists in bytes but for monitoring
purposes we might want to consider #messages).

I don't agree that we should not invest in doing this in Sources/Sinks and
going directly to SplittableDoFn because the IO API is familiar and known,
and as long as we keep it should be treated as a first class citizen.

As for enable/disable - if IOs consider focusing on pipeline-related
metrics I think we should be fine, though this could also change between
runners as well.

Finally, considering "split-metrics" is interesting because on one hand it
affects the pipeline directly (unbalanced partitions in Kafka that may
cause backlog) but this is that fine-line of responsibilities (Kafka
monitoring would probably be able to tell you that partitions are not
balanced).

My 2 cents, cheers!

On Tue, Feb 14, 2017 at 8:46 PM Raghu Angadi 
wrote:


On Tue, Feb 14, 2017 at 9:21 AM, Ben Chambers




wrote:




* I also think there are data source specific metrics that a given IO

will

want to expose (ie, things like kafka backlog for a topic.)



UnboundedSource has API for backlog. It is better for beam/runners to
handle backlog as well.
Of course there will be some source specific metrics too (errors, i/o ops
etc).







--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


Re: Pipeline Surgery and an interception-free future

2017-02-17 Thread Aljoscha Krettek
Thomas, how were you planning to get the StreamingViewAs* PTransforms out
of the apply() in the Dataflow runner?

I'm asking because the Flink Runner works basically the same way and I've
run into a problem. My problem is that the TupleTag of a PCollectionView is
generated when the user applies, for example, the View.AsIterable
PTransform. When the override is in the runner apply() this is not a
problem because the tag of the PCollectionView that the user has matches
the tag that we internally have in the Pipeline. Now, if I later change the
View.AsIterable using Pipeline surgery a new PCollectionView will be
created that will have a different internal tag from the one that the user
PCollectionView has. Then , we the user creates a ParDo with side inputs I
cannot properly match the replaced View against the user View and I can't
properly stitch together an execution graph.

On Fri, 17 Feb 2017 at 00:45 Amit Sela  wrote:

> Awesome!
> First thing I'm gonna do:
>
>1. traverse the pipeline to determine if streaming.
>2. If streaming, replace Read.Bounded with an adapted Read.Unbounded.
>
> Current implementation forces translating bounded reads by the unbounded
> translator and it feels awkward, this makes it right again.
>
> Thanks Thomas!
>
> On Thu, Feb 16, 2017 at 4:12 PM Aljoscha Krettek 
> wrote:
>
> > I might just try and do that. ;-)
> >
> > On Thu, 16 Feb 2017 at 03:55 Thomas Groh 
> wrote:
> >
> > > As of Github PR #1998 (https://github.com/apache/beam/pull/1998), the
> > new
> > > Pipeline Surgery API is ready and available. There are a couple of
> > > refinements coming in PR #2006, but in general pipelines can now, post
> > > construction, have PTransforms swapped out to whatever the runner
> desires
> > > (standard "behavior-maintaining" caveats apply).
> > >
> > > Moving forwards, this will enable pipelines to be run on multiple
> runners
> > > without having to reconstruct the graph via repeated applications of
> > > PTransforms to the pipeline (this also includes being able to, for
> > example,
> > > read a pipeline from a serialized representation, and executing the
> > result
> > > on an arbitrary runner).
> > >
> > > Those of you who are runner authors (at least, those who I can easily
> > > identify as such) should expect a Pull Request from me sometime next
> week
> > > porting you off of intercepting calls to apply and to the new surgery
> > API.
> > > You are, of course, welcome to beat me to the punch.
> > >
> > > Thanks,
> > >
> > > Thomas
> > >
> >
>