Re: [Proposal] Kaskada DSL and FnHarness for Temporal Queries

2023-06-12 Thread Ben Chambers
Hey Daniel -- Great question!

Kaskada was designed to be similar to SQL but with a few differences.
The most significant is the assumption of both ordering and grouping.
Kaskada uses this to automatically merge multiple input collections,
and to allow data-dependent windows that identify a range of time. For
instance, the query `Purchases.amount | sum(window = since(Login))` to
sum the amount spent since the last login. In user studies, we've
heard that these make it much easier to compose queries analyzing the
entire "journey" or "funnel" for each user.

There are also cases where the ordering assumption *isn't* a good fit
-- queries that aren't as sensitive to time. Having both options
readily available would allow a user to choose what is most natural to
them and their use case.

-- Ben

On Mon, Jun 12, 2023 at 12:14 PM Daniel Collins  wrote:
>
> How does this mechanism differ from beam SQL which already offers windowing 
> via SQL over PCollections?
>
> https://beam.apache.org/documentation/dsls/sql/extensions/windowing-and-triggering/
>
> -Daniel
>
> On Mon, Jun 12, 2023 at 3:11 PM Ryan Michael  wrote:
>>
>> Hello, Beam (also)!
>>
>> Just introducing myself - I'm Ryan and I've been working with Ben on the 
>> Kaskada project for the past few years. As Ben mentioned, I think there's a 
>> great opportunity to bring together some of the work we've done to make 
>> time-based computation easier to reason about with the Beam community's work 
>> on scalable streaming computation.
>>
>> I'll be at the Beam Summit in NYC starting Wednesday and presenting a short 
>> overview of how we see Kaskada fitting into the Generative AI world at the 
>> "Generative AI Meetup" Wednesday afternoon - if the doc Ben linked to (or 
>> GenAI) is interesting to you and you'll be at the conference I'd love to 
>> touch base in person!
>>
>> -Ryan
>>
>> On Mon, Jun 12, 2023 at 2:51 PM Ben Chambers  wrote:
>>>
>>> Hello Beam!
>>>
>>> Kaskada has created a query language for expressing temporal queries,
>>> making it easy to work with multiple streams and perform temporally
>>> correct joins. We’re looking at taking our native, columnar execution
>>> engine and making it available as a PTransform and FnHarness for use
>>> with Apache Beam.
>>>
>>> We’ve drafted a [short document][proposal] outlining our planned
>>> approach and the potential benefits to Kaskada and Beam users. It
>>> would be super helpful to get some feedback on this approach and any
>>> ways that it could be improved / better integrated with Beam to
>>> provide more value!
>>>
>>> Could you see yourself using (or contributing) to this work? Let us know!
>>>
>>> Thanks!
>>>
>>> Ben
>>>
>>> [proposal]: 
>>> https://docs.google.com/document/d/1w6DYpYCi1c521AOh83JN3CB3C9pZwBPruUbawqH-NsA/edit
>>
>>
>>
>> --
>> Ryan Michael
>> keri...@gmail.com | 512.466.3662 | github | linkedin


[Proposal] Kaskada DSL and FnHarness for Temporal Queries

2023-06-12 Thread Ben Chambers
Hello Beam!

Kaskada has created a query language for expressing temporal queries,
making it easy to work with multiple streams and perform temporally
correct joins. We’re looking at taking our native, columnar execution
engine and making it available as a PTransform and FnHarness for use
with Apache Beam.

We’ve drafted a [short document][proposal] outlining our planned
approach and the potential benefits to Kaskada and Beam users. It
would be super helpful to get some feedback on this approach and any
ways that it could be improved / better integrated with Beam to
provide more value!

Could you see yourself using (or contributing) to this work? Let us know!

Thanks!

Ben

[proposal]: 
https://docs.google.com/document/d/1w6DYpYCi1c521AOh83JN3CB3C9pZwBPruUbawqH-NsA/edit


Re: Java SDK Extensions

2018-10-03 Thread Ben Chambers
On Wed, Oct 3, 2018 at 12:16 PM Jean-Baptiste Onofré 
wrote:

> Hi Anton,
>
> jackson is the json extension as we have XML. Agree that it should be
> documented.
>
> Agree about join-library.
>
> sketching is some statistic extensions providing ready to use stats
> CombineFn.
>
> Regards
> JB
>
> On 03/10/2018 20:25, Anton Kedin wrote:
> > Hi dev@,
> >
> > *TL;DR:* `sdks/java/extensions` is hard to discover, navigate and
> > understand.
> >
> > *Current State:*
> > *
> > *
> > I was looking at `sdks/java/extensions`[1] and realized that I don't
> > know what half of those things are. Only `join library` and `sorter`
> > seem to be documented and discoverable on Beam website, under SDKs
> > section [2].
> >
> > Here's the list of all extensions with my questions/comments:
> >   - /google-cloud-platform-core/. What is this? Is this used in GCP IOs?
> > If so, is `extensions` the right place for it? If it is, then why is it
> > a `-core` extension? It feels like it's a utility package, not an
> extension;
> >   - /jackson/. I can guess what it is but we should document it
> somewhere;
> >   - /join-library/. It is documented, but I think we should add more
> > documentation to explain how it works, maybe some caveats, and link
> > to/from the `CoGBK` section of the doc;
>

Should also probably indicate that using the join-library twice on the same
with 3 input collections is less efficient than a single CoGBK with those 3
input collections.


> >   - /protobuf/. I can probably guess what is it. Is 'extensions' the
> > right place for it though? We use this library in IOs
> > (`PubsubsIO.readProtos()`), should we move it to IO then? Same as with
> > GCP extension, feels like a utility library, not an extension;
> >   - /sketching/. No idea what to expect from this without reading the
> code;
> >   - /sorter/. Documented on the website;
> >   - /sql/. This looks familiar :) It is documented but not linked from
> > the extensions section, it's unclear whether it's the whole SQL or just
> > some related components;
> >
> > [1]: https://github.com/apache/beam/tree/master/sdks/java/extensions
> > [2]: https://beam.apache.org/documentation/sdks/java-extensions/
> >
> > *Questions:*
> >
> >   - should we minimally document (at least describe) all extensions and
> > add at least short readme.md's with the links to the Beam website?
> >   - is it a right thing to depend on `extensions` in other components
> > like IOs?
> >   - would it make sense to move some things out of 'extensions'? E.g. IO
> > components to IO or utility package, SQL into new DSLs package;
> >
> > *Opinion:*
> > *
> > *
> > Maybe I am misunderstanding the intent and meaning of 'extensions', but
> > from my perspective:
> > *
> > *
> >   - I think that extensions should be more or less isolated from the
> > Beam SDK itself, so that if you delete or modify them, no Beam-internal
> > changes will be required (changes to something that's not an extension).
> > And my feeling is that they should provide value by themselves to users
> > other than SDK authors. They are called 'extensions', not 'critical
> > components' or 'sdk utilities';
> >
> >   - I don't think that IOs should depend on 'extensions'. Otherwise the
> > question is, is it ok for other components, like runners, to do the
> > same? Or even core?
> >
> >   - I think there are few distinguishable classes of things in
> > 'extensions' right now:
> >   - collections of `PTransforms` with some business logic (Sorter,
> > Join, Sketch);
> >   - collections of `PTransforms` with focus parsing (Jackson,
> Protobuf);
> >   - DSLs; SQL DSL with more than just a few `PTransforms`, it can be
> > used almost as a standalone SDK. Things like Euphoria will probably end
> > up in the same class;
> >   - utility libraries shared by some parts of the SDK and unclear if
> > they are valuable by themselves to external users (Protobuf, GCP core);
> > To me the business logic and parsing libraries do make sense to stay
> > in extensions, but probably under different subdirectories. I think it
> > will make sense to split others out of extensions into separate parts of
> > the SDK.
> >
> >   - I think we should add readme.md's with short descriptions and links
> > to Beam website;
> >
> > Thoughts, comments?
> >
> >
> > [1]: https://github.com/apache/beam/tree/master/sdks/java/extensions
> > [2]: https://beam.apache.org/documentation/sdks/java-extensions/
>


Re: [portablility] metrics interrogations

2018-09-12 Thread Ben Chambers
I think there is a confusion of terminology here. Let me attempt to clarify
as a (mostly) outsider.

I think that Etienne is correct, in that the SDK harness only reports the
difference associated with a bundle. So, if we have a metric that measures
execution time, the SDK harness reports the time spent executing *that
bundle*.

It does not need to report the total execution time which would require
knowing the total execution time without that bundle. This would in fact be
nearly impossible -- since multiple bundles may be executed simultaneously
there is no "total execution time without this bundle" available within the
system.

Robert is also correct, in that the SDK harness may send a partial diff
based on the current state, when asked to do so. This is important -- if a
bundle were to take 1 hour to complete, do we want the execution time to be
5 hours (for the entire hour) and then jump to 6 hours, or do we want it to
be able to increase somewhat smoothly during the hour? Supporting a smooth
increase requires being able to send a partial result back -- "if this
bundle were to complete now, here is the diff that would be commited".

Of course, if a runner uses these partial updates it may need logic to deal
with the case of a partial update from a bundle that has failed, but that I
think that is outside the scope of the original question. Needless to say,
it requires something like a committed value and "tentative" value
reflecting bundles that are in progress.W

Does that match what both sides have in mind?

On Wed, Sep 12, 2018 at 8:05 AM Robert Bradshaw  wrote:

> On Wed, Sep 12, 2018 at 4:20 PM Etienne Chauchot 
> wrote:
>
>> Thanks Robert for the details,
>>
>> I did not know that the runner harness periodically asked the SDK harness
>> for updates. I thought it was only communicating at the beginning of the
>> bundle and at the end. Something like the simplified sequence diagram
>> bellow
>>
>>
>>
> The runner may periodically send a
> https://github.com/apache/beam/blob/9b68f926628d727e917b6a33ccdafcfe693eef6a/model/fn-execution/src/main/proto/beam_fn_api.proto#L263
> and the response may contain metrics.
>
>
>> but if the metrics are not a regular diff but more the not-yet-committed
>> dirty value, that means that the runner sends the metrics value to the sdk
>> before the bundle is started processing.
>> So the sequence diagram becomes something more like:
>>
>>
>> WDYT ?
>>
>
> The runner never sends any metrics to the SDK, it just listens. It may
> aggregate them or send them upstream. An SDK can choose to publish all,
> none, or some strategic subset of metrics in its progress responses, as
> "latest values." On work output, it publishes everything. (Regarding your
> diagram, there may not even be any "metric cells" in the Java Runner
> Harness.)
>
>
>> Le mardi 11 septembre 2018 à 17:53 +0200, Robert Bradshaw a écrit :
>> On Mon, Sep 10, 2018 at 11:07 AM Etienne Chauchot 
>> wrote:
>>
>> Hi all,
>>
>> @Luke, @Alex I have a general question related to metrics in the Fn API:
>> as the communication between runner harness and SDK harness is done on a
>> bundle basis. When the runner harness sends data to the sdk harness to
>> execute a transform that contains metrics, does it:
>>
>>1. send metrics values (for the ones defined in the transform)
>>alongside with data and receive an updated value of the metrics from the
>>sdk harness when the bundle is finished processing?
>>2. or does it send only the data and the sdk harness responds with a
>>diff value of the metrics so that the runner can update them in its side?
>>
>> My bet is option 2. But can you confirm?
>>
>>
>> The runner harness periodically asks for the status of a bundle to which
>> the runner harness may respond with a current snapshot of metrics. These
>> metrics are deltas in the sense that only "dirty" metrics need to be
>> reported (i.e. unreported metrics can be assumed to have their previous
>> values) but are *not* deltas with respect to values, i.e. the full value is
>> reported each time. As an example, suppose one were counting red and blue
>> marbles. The first update may be something like
>>
>> { red: 5, blue: 7}
>>
>> and if two more blue ones were found, a valid update would be
>>
>> { blue: 9 }
>>
>> On bundle completion, the full set of metrics is reported as part of the
>> same message that declares the bundle complete.
>>
>>
>>
>> On Tue, Sep 11, 2018 at 11:43 AM Etienne Chauchot 
>> wrote:
>>
>> Le lundi 10 septembre 2018 à 09:42 -0700, Lukasz Cwik a écrit :
>>
>> Alex is out on vacation for the next 3 weeks.
>>
>> Alex had proposed the types of metrics[1] but not the exact protocol as
>> to what the SDK and runner do. I could envision Alex proposing that the SDK
>> harness only sends diffs or dirty metrics in intermediate updates and all
>> metrics values in the final update.
>> Robert is referring to an integration that happened to an older set of
>> messages[2] that preceeded Alex's propo

Re: Existing transactionality inconsistency in the Beam Java State API

2018-05-24 Thread Ben Chambers
I think Kenn's second option accurately reflects my memory of the original
intentions:

1. I remember we we considered either using the Future interface or calling
the ReadableState interface a future, and explicitly said "no, future
implies asynchrony and that the value returned by `get` won't change over
multiple calls, but we want the latest value each time". So, I remember us
explicitly considering and rejecting Future, thus the name "ReadableState".

2. The intuition behind the implementation was analogous to a
mutable-reference cell in languages like ML / Scheme / etc. The
ReadableState is just a pointer to the the reference cell. Calling read
returns the value currently in the cell. If we have 100 ReadableStates
pointing at the same cell, they all get the same value regardless of when
they were created. This avoids needing to duplicate/snapshot values at any
point in time.

3. ReadLater was added, as noted by Charles, to suggest prefetching the
associated value. This was added after benchmarks showed 10x (if I remember
correctly) performance improvements in things like GroupAlsoByWindows by
minimizing round-trips asking for more state. The intuition being -- if we
need to make an RPC to load one state value, we are better off making an
RPC to load all the values we need.

Overall, I too lean towards maintaining the second interpretation since it
seems to be consistent and I believe we had additional reasons for
preferring it over futures.

Given the confusion, I think strengthening the class documentation makes
sense -- I note the only hint of the current behavior is that ReadableState
indicates it gets the *current* value (emphasis mine). We should emphasize
that and perhaps even mention that the ReadableState should be understood
as just a reference or handle to the underlying state, and thus its value
will reflect the latest write.

Charles, if it helps, the plan I remember regarding prefetching was
something like:

interface ReadableMapState {
   ReadableState get(K key);
   ReadableState> getIterable();
   ReadableState> get();
   // ... more things ...
}

Then prefetching a value is `mapState.get(key).readLater()` and prefetching
the entire map is `mapState.get().readLater()`, etc.

On Wed, May 23, 2018 at 7:13 PM Charles Chen  wrote:

> Thanks Kenn.  I think there are two issues to highlight: (1) the API
> should allow for some sort of prefetching / batching / background I/O for
> state; and (2) it should be clear what the semantics are for reading (e.g.
> so we don't have confusing read after write behavior).
>
> The approach I'm leaning towards for (1) is to allow a state.prefetch()
> method (to prefetch a value, iterable or [entire] map state) and maybe
> something like state.prefetch_key(key) to prefetch a specific KV in the
> map.  Issue (2) seems to be okay in either of Kenn's positions.
>
> On Wed, May 23, 2018 at 5:33 PM Robert Bradshaw 
> wrote:
>
>> Thanks for laying this out so well, Kenn. I'm also leaning towards the
>> second option, despite its drawbacks. (In particular, readLater should
>> not influence what's returned at read(), it's just a hint.)
>>
>> On Wed, May 23, 2018 at 4:43 PM Kenneth Knowles  wrote:
>>
>>> Great idea to bring it to dev@. I think it is better to focus here than
>>> long doc comment threads.
>>>
>>> I had strong opinions that I think were a bit confused and wrong. Sorry
>>> for that. I stated this position:
>>>
>>>  - XYZState class is a handle to a mutable location
>>>  - its methods like isEmpty() or contents() should return immutable
>>> future values (implicitly means their contents are semantically frozen when
>>> they are created)
>>>  - the fact that you created the future is a hint that all necessary
>>> fetching/computation should be kicked off
>>>  - later forced with get()
>>>  - when it was designed, pure async style was not a viable option
>>>
>>> I see now that the actual position of some of its original designers is:
>>>
>>>  - XYZState class is a view on a mutable location
>>>  - its methods return new views on that mutable location
>>>  - calling readLater() is a hint that some fetching/computation should
>>> be kicked off
>>>  - later read() will combine whatever readLater() did with additional
>>> local info to give the current value
>>>  - async style not applicable nor desirable as per Beam's focus on naive
>>> straight-line coding + autoscaling
>>>
>>> These are both internally consistent I think. In fact, I like the second
>>> perspective better than the one I have been promoting. There are some
>>> weaknesses: readLater() is pretty tightly coupled to a particular
>>> implementation style, and futures are decades old so you can get good APIs
>>> and performance without inventing anything. But I still like the non-future
>>> version a little better.
>>>
>>> Kenn
>>>
>>> On Wed, May 23, 2018 at 4:05 PM Charles Chen  wrote:
>>>
 During the design of the Beam Python State API, we noticed some
 transactionality inconsistencies 

Re: What is the future of Reshuffle?

2018-05-21 Thread Ben Chambers
+1 to introducing alternative transforms even if they wrap Reshuffle

The benefits of making them distinct is that we can put appropriate Javadoc
in place and runners can figure out what the user is intending and whether
Reshuffle or some other implementation is appropriate. We can also see
which of these use cases is most common by inspecting the transform usage.
And, when better options are available, we can just introduce the
appropriate transform, to get the appropriate special behavior.

I think there were three cases of use that I can remember:

1. Splitting up retry domains (eg., if one transform is expensive,
preventing failures in nearby transforms from causing it to be retried)
2. Redistributing elements to improve parallelism
3. Checkpointing/snapshotting a pcollection before side-effecting the
outside world (eg., RequiresStableInput)

-- Ben

On Mon, May 21, 2018 at 9:56 AM Robert Bradshaw  wrote:

> We should probably keep the warning and all the caveats until we introduce
> the alternative (and migrate to it for the non-parallelism uses of
> reshuffle). I was just proposing we do this via a separate transform that
> just calls Reshuffle until we have the new story fully fleshed out (I don't
> know if any runner supports RequresStableInput, and it isn't translated
> in the Fn API) to avoid being in this intermediate state for yet another
> year.
>
> On Sun, May 20, 2018 at 6:38 PM Raghu Angadi  wrote:
>
>>
>>
>> On Sat, May 19, 2018 at 10:55 PM Robert Bradshaw 
>> wrote:
>>
>>> On Sat, May 19, 2018 at 6:27 PM Raghu Angadi  wrote:
>>>
 [...]

>>> I think it would be much more user friendly to un-deprecate it to add a
 warning for advanced users about non-portability of durability/replay
 guarantees/stable input assumptions.

>
>>> Yes, I think everyone in this thread is in agreement here. We should
>>> provide a *different* transform that provides the durability guarantees
>>> (with caveats). In the meantime, this delegating to a reshuffle would be
>>> better than using a reshuffle directly.
>>>
>>
>> Great. Sent a PR to undeprecate Reshuffle :
>> https://github.com/apache/beam/pull/5432
>> The wording there for JavaDoc just a proposal...
>>
>> Raghu.
>>
>>
>>> We tend to put in reshuffles in order to "commit" these random values
> and make them stable for the next stage, to be used to provide the 
> needed
> idempotency for sinks.
>

 In such cases, I think the author should error out on the runner
 that don't provide that guarantee. That is what ExactlyOnceSink in 
 KafkaIO
 does [1].

 [1]
 https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1049

>>>
>>> We're moving to a world where the runner may not be known at
>>> pipeline construction time. However, explicitly using a (distinct)
>>> make-input-stable transform when that's the intent (which could be a
>>> primitive that runners should implement, possibly by swapping in 
>>> Reshuffle,
>>> or reject) would allow for this. That being said, the exact semantics of
>>> this transform is a bit of a rabbit hole which is why we never finished 
>>> the
>>> job of deprecating Reshuffle. This is a case where doing something is
>>> better than doing nothing, and our use of URNs for this kind of thing is
>>> flexible enough that we can deprecate old ones if/when we have time to
>>> pound out the right solution.
>>>
>>>

> Kenn
>
> On Fri, May 18, 2018 at 4:05 PM Raghu Angadi 
> wrote:
>
>>
>> On Fri, May 18, 2018 at 12:21 PM Robert Bradshaw <
>> rober...@google.com> wrote:
>>
>>> On Fri, May 18, 2018 at 11:46 AM Raghu Angadi <
>>> rang...@google.com> wrote:
>>>
 Thanks Kenn.

 On Fri, May 18, 2018 at 11:02 AM Kenneth Knowles <
 k...@google.com> wrote:

> The fact that its usage has grown probably indicates that we
> have a large number of transforms that can easily cause data loss 
> /
> duplication.
>

 Is this specific to Reshuffle or it is true for any GroupByKey?
 I see Reshuffle as just a wrapper around GBK.

>>> The issue is when it's used in such a way that data corruption
>>> can occur when the underlying GBK output is not stable.
>>>
>>
>> Could you describe this breakage bit more in detail or give a
>> example? Apologies in advance, I know this came up in multiple 
>> contexts in
>> the past, but I haven't grokked the issue well. It is the window 
>> rewrite
>> that Reshuffle does that causes misuse of GBK?
>>
>> Thanks.
>>>

Re: Beam parent SNAPSHOTs are not being built

2018-04-25 Thread Ben Chambers
It looks like the problem is that your project uses beam-examples-parent as
its parent. This was the suggested way of creating a Maven project because
we published a POM and this provided an easy way to make sure that derived
projects inherited the same options and versions as Beam used.

With the move to Gradle, it seems that the parent is not published, but it
is also not depended on by any of the published Beam artifacts. So the only
cases it appears as a dependency are cases like this, where the user's
project explicitly depends on it.

So we either need to (1) publish the beam-parent and/or
beam-examples-parent with version numbers suitable for user projects or (2)
have every user change their pom to not depend on a Beam-provided parent.

On Wed, Apr 25, 2018 at 4:37 PM Eric Beach  wrote:

> If I run $ mvn clean compile exec:java ... on my project, I get the
> following error stack trace:
>
> java.lang.reflect.InvocationTargetException
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:294)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.NoClassDefFoundError:
> com/google/api/services/clouddebugger/v2/CloudDebugger
>
> at java.lang.Class.getDeclaredMethods0(Native Method)
>
> at java.lang.Class.privateGetDeclaredMethods(Class.java:2703)
>
> at java.lang.Class.getDeclaredMethod(Class.java:2130)
>
> at
> org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:206)
>
> at
> org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162)
>
> at
> org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:55)
>
> at org.apache.beam.sdk.Pipeline.create(Pipeline.java:150)
>
> at
> com.google.cloud.pontem.CloudSpannerDatabaseBackup.main(CloudSpannerDatabaseBackup.java:359)
>
> ... 6 more
>
> Caused by: java.lang.ClassNotFoundException:
> com.google.api.services.clouddebugger.v2.CloudDebugger
>
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>
> When I dig deeper, the root problem seems to be that my project is pulling
> in v2-rev8-1.22.0 instead of v2-rev233-1.23.0 (changing commit here
> 
> ).
>
> In fact, all the version changes in this commit
> 
>  are missing.
>
> My project's pom.xml contains the following:
>
> 
>
> beam-examples-parent
>
> org.apache.beam
>
> 2.5.0-SNAPSHOT
>
> 
>
> 4.0.0
>
>
> com.google.cloud
>
> xyz
>
>
> 
>
> 
>
> apache.snapshots
>
> Apache Development Snapshot Repository
>
> 
> https://repository.apache.org/content/repositories/snapshots/
>
> 
>
> false
>
> 
>
> 
>
> true
>
> 
>
> 
>
> 
>
> My first round of attempts to debug included removing ~/.m2/ and a number
> of other techniques.
>
> When I look at
> https://repository.apache.org/content/repositories/snapshots, I notice
> that beam-parent with 2.5.0-SNAPSHOT
> 
>  is
> very outdated and so all the version numbers inherited from the POM are
> wrong (see
> https://repository.apache.org/content/repositories/snapshots/org/apache/beam/beam-parent/2.5.0-SNAPSHOT/).
> If you look at
> https://repository.apache.org/content/repositories/snapshots/org/apache/beam/
>  you will see that the beam-parent is significantly old.
>
> So, it seems imperative that the snapshot be rebuilt before the finalized
> 2.5.0 version is set or any projects depending on the Beam parent will
> break. (I depend upon the Beam parent because trying to keep the versions
> of all the different Google projects in sync is a total nightmare).
>
>
> On Wed, Apr 25, 2018 at 6:58 PM Chamikara Jayalath 
> wrote:
>
>> Ccing Eric for providing more context.
>>
>> Thanks,
>> Cham
>>
>> On Wed, Apr 25, 2018 at 3:38 PM Scott Wegner  wrote:
>>
>>> Do you have any more context on how they were using the parent pom? In
>>> the Gradle build, instead of using a parent pom hierarchy we are embedding
>>> the complete set of dependencies and metadata in each generated pom. They
>>> should be functionally equiv

Re: Updated [Proposal] Apache Beam Fn API : Defining and adding SDK Metrics

2018-04-17 Thread Ben Chambers
That sounds like a very reasonable choice -- given the discussion seemed to
be focusing on the differences between these two categories, separating
them will allow the proposal (and implementation) to address each category
in the best way possible without needing to make compromises.

Looking forward to the updated proposal.

On Tue, Apr 17, 2018 at 10:53 AM Alex Amato  wrote:

> Hello,
>
> I just wanted to give an update .
>
> After some discussion, I've realized that its best to break up the two
> concepts, with two separate way of reporting monitoring data. These two
> categories are:
>
>1. Metrics - Counters, Gauges, Distributions. These are well defined
>concepts for monitoring information and ned to integrate with existing
>metrics collection systems such as Dropwizard and Stackdriver. Most metrics
>will go through this model, which will allow runners to process new metrics
>without adding extra code to support them, forwarding them to metric
>collection systems.
>2. Monitoring State - This supports general monitoring data which may
>not fit into the standard model for Metrics. For example an I/O source may
>provide a table of filenames+metadata, for files which are old and blocking
>the system. I will propose a general approach, similar to the URN+payload
>approach used in the doc right now.
>
> One thing to keep in mind -- even though it makes sense to allow each I/O
source to define their own monitoring state, this then shifts
responsibility for collecting that information to each runner and
displaying that information to every consumer. It would be reasonable to
see if there could be a set of 10 or so that covered most of the cases that
could become the "standard" set (eg., watermark information, performance
information, etc.).


> I will rewrite most of the doc and propose separating these two very
> different use cases, one which optimizes for integration with existing
> monitoring systems. The other which optimizes for flexibility, allowing
> more complex and custom metrics formats for other debugging scenarios.
>
> I just wanted to give a brief update on the direction of this change,
> before writing it up in full detail.
>
>
> On Mon, Apr 16, 2018 at 10:36 AM Robert Bradshaw 
> wrote:
>
>> I agree that the user/system dichotomy is false, the real question of how
>> counters can be scoped to avoid accidental (or even intentional)
>> interference. A system that entirely controls the interaction between the
>> "user" (from its perspective) and the underlying system can do this by
>> prefixing all requested "user" counters with a prefix it will not use
>> itself. Of course this breaks down whenever the wrapping isn't complete
>> (either on the production or consumption side), but may be worth doing for
>> some components (like the SDKs that value being able to provide this
>> isolation for better behavior). Actual (human) end users are likely to be
>> much less careful about avoiding conflicts than library authors who in turn
>> are generally less careful than authors of the system itself.
>>
>> We could alternatively allow for specifying fully qualified URNs for
>> counter names in the SDK APIs, and letting "normal" user counters be in the
>> empty namespace rather than something like beam:metrics:{user,other,...},
>> perhaps with SDKs prohibiting certain conflicting prefixes (which is less
>> than ideal). A layer above the SDK that has similar absolute control over
>> its "users" would have a similar decision to make.
>>
>>
>> On Sat, Apr 14, 2018 at 4:00 PM Kenneth Knowles  wrote:
>>
>>> One reason I resist the user/system distinction is that Beam is a
>>> multi-party system with at least SDK, runner, and pipeline. Often there may
>>> be a DSL like SQL or Scio, or similarly someone may be building a platform
>>> for their company where there is no user authoring the pipeline. Should
>>> Scio, SQL, or MyCompanyFramework metrics end up in "user"? Who decides to
>>> tack on the prefix? It looks like it is the SDK harness? Are there just
>>> three namespaces "runner", "sdk", and "user"?  Most of what you'd think
>>> of as "user" version "system" should simply be the different between
>>> dynamically defined & typed metrics and fields in control plane protos. If
>>> that layer of the namespaces is not finite and limited, who can extend make
>>> a valid extension? Just some questions that I think would flesh out the
>>> meaning of the "user" prefix.
>>>
>>> Kenn
>>>
>>> On Fri, Apr 13, 2018 at 5:26 PM Andrea Foegler 
>>> wrote:
>>>


 On Fri, Apr 13, 2018 at 5:00 PM Robert Bradshaw 
 wrote:

> On Fri, Apr 13, 2018 at 3:28 PM Andrea Foegler 
> wrote:
>
>> Thanks, Robert!
>>
>> I think my lack of clarity is around the MetricSpec.  Maybe what's in
>> my head and what's being proposed are the same thing.  When I read that 
>> the
>> MetricSpec describes the proto structure, that sound kind of complicated

Re: Updated [Proposal] Apache Beam Fn API : Defining and adding SDK Metrics

2018-04-12 Thread Ben Chambers
Sounds perfect. Just wanted to make sure that "custom metrics of supported
type" didn't include new ways of aggregating ints. As long as that means we
have a fixed set of aggregations (that align with what what users want and
metrics back end support) it seems like we are doing user metrics right.

- Ben

On Wed, Apr 11, 2018, 11:30 PM Romain Manni-Bucau 
wrote:

> Maybe leave it out until proven it is needed. ATM counters are used a lot
> but others are less mainstream so being too fine from the start can just
> add complexity and bugs in impls IMHO.
>
> Le 12 avr. 2018 08:06, "Robert Bradshaw"  a écrit :
>
>> By "type" of metric, I mean both the data types (including their
>> encoding) and accumulator strategy. So sumint would be a type, as would
>> double-distribution.
>>
>> On Wed, Apr 11, 2018 at 10:39 PM Ben Chambers 
>> wrote:
>>
>>> When you say type do you mean accumulator type, result type, or
>>> accumulator strategy? Specifically, what is the "type" of sumint, sumlong,
>>> meanlong, etc?
>>>
>>> On Wed, Apr 11, 2018, 9:38 PM Robert Bradshaw 
>>> wrote:
>>>
>>>> Fully custom metric types is the "more speculative and difficult"
>>>> feature that I was proposing we kick down the road (and may never get to).
>>>> What I'm suggesting is that we support custom metrics of standard type.
>>>>
>>>> On Wed, Apr 11, 2018 at 5:52 PM Ben Chambers 
>>>> wrote:
>>>>
>>>>> The metric api is designed to prevent user defined metric types based
>>>>> on the fact they just weren't used enough to justify support.
>>>>>
>>>>> Is there a reason we are bringing that complexity back? Shouldn't we
>>>>> just need the ability for the standard set plus any special system 
>>>>> metrivs?
>>>>> On Wed, Apr 11, 2018, 5:43 PM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> Thanks. I think this has simplified things.
>>>>>>
>>>>>> One thing that has occurred to me is that we're conflating the idea
>>>>>> of custom metrics and custom metric types. I would propose the MetricSpec
>>>>>> field be augmented with an additional field "type" which is a urn
>>>>>> specifying the type of metric it is (i.e. the contents of its payload, as
>>>>>> well as the form of aggregation). Summing or maxing over ints would be a
>>>>>> typical example. Though we could pursue making this opaque to the runner 
>>>>>> in
>>>>>> the long run, that's a more speculative (and difficult) feature to 
>>>>>> tackle.
>>>>>> This would allow the runner to at least aggregate and report/return to 
>>>>>> the
>>>>>> SDK metrics that it did not itself understand the semantic meaning of. 
>>>>>> (It
>>>>>> would probably simplify much of the specialization in the runner itself 
>>>>>> for
>>>>>> metrics that it *did* understand as well.)
>>>>>>
>>>>>> In addition, rather than having UserMetricOfTypeX for every type X
>>>>>> one would have a single URN for UserMetric and it spec would designate 
>>>>>> the
>>>>>> type and payload designate the (qualified) name.
>>>>>>
>>>>>> - Robert
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 11, 2018 at 5:12 PM Alex Amato 
>>>>>> wrote:
>>>>>>
>>>>>>> Thank you everyone for your feedback so far.
>>>>>>> I have made a revision today which is to make all metrics refer to a
>>>>>>> primary entity, so I have restructured some of the protos a little bit.
>>>>>>>
>>>>>>> The point of this change was to futureproof the possibility of
>>>>>>> allowing custom user metrics, with custom aggregation functions for its
>>>>>>> metric updates.
>>>>>>> Now that each metric has an aggregation_entity associated with it
>>>>>>> (e.g. PCollection, PTransform), we can design an approach which forwards
>>>>>>> the opaque bytes metric updates, without deserializing them. These are
>>>>>>> forwarded to user provided code which then would deserialize the metric
&

Re: Updated [Proposal] Apache Beam Fn API : Defining and adding SDK Metrics

2018-04-11 Thread Ben Chambers
When you say type do you mean accumulator type, result type, or accumulator
strategy? Specifically, what is the "type" of sumint, sumlong, meanlong,
etc?

On Wed, Apr 11, 2018, 9:38 PM Robert Bradshaw  wrote:

> Fully custom metric types is the "more speculative and difficult" feature
> that I was proposing we kick down the road (and may never get to). What I'm
> suggesting is that we support custom metrics of standard type.
>
> On Wed, Apr 11, 2018 at 5:52 PM Ben Chambers  wrote:
>
>> The metric api is designed to prevent user defined metric types based on
>> the fact they just weren't used enough to justify support.
>>
>> Is there a reason we are bringing that complexity back? Shouldn't we just
>> need the ability for the standard set plus any special system metrivs?
>> On Wed, Apr 11, 2018, 5:43 PM Robert Bradshaw 
>> wrote:
>>
>>> Thanks. I think this has simplified things.
>>>
>>> One thing that has occurred to me is that we're conflating the idea of
>>> custom metrics and custom metric types. I would propose the MetricSpec
>>> field be augmented with an additional field "type" which is a urn
>>> specifying the type of metric it is (i.e. the contents of its payload, as
>>> well as the form of aggregation). Summing or maxing over ints would be a
>>> typical example. Though we could pursue making this opaque to the runner in
>>> the long run, that's a more speculative (and difficult) feature to tackle.
>>> This would allow the runner to at least aggregate and report/return to the
>>> SDK metrics that it did not itself understand the semantic meaning of. (It
>>> would probably simplify much of the specialization in the runner itself for
>>> metrics that it *did* understand as well.)
>>>
>>> In addition, rather than having UserMetricOfTypeX for every type X one
>>> would have a single URN for UserMetric and it spec would designate the type
>>> and payload designate the (qualified) name.
>>>
>>> - Robert
>>>
>>>
>>>
>>> On Wed, Apr 11, 2018 at 5:12 PM Alex Amato  wrote:
>>>
>>>> Thank you everyone for your feedback so far.
>>>> I have made a revision today which is to make all metrics refer to a
>>>> primary entity, so I have restructured some of the protos a little bit.
>>>>
>>>> The point of this change was to futureproof the possibility of allowing
>>>> custom user metrics, with custom aggregation functions for its metric
>>>> updates.
>>>> Now that each metric has an aggregation_entity associated with it (e.g.
>>>> PCollection, PTransform), we can design an approach which forwards the
>>>> opaque bytes metric updates, without deserializing them. These are
>>>> forwarded to user provided code which then would deserialize the metric
>>>> update payloads and perform the custom aggregations.
>>>>
>>>> I think it has also simplified some of the URN metric protos, as they
>>>> do not need to keep track of ptransform names inside themselves now. The
>>>> result is simpler structures, for the metrics as the entities are pulled
>>>> outside of the metric.
>>>>
>>>> I have mentioned this in the doc now, and wanted to draw attention to
>>>> this particular revision.
>>>>
>>>>
>>>>
>>>> On Tue, Apr 10, 2018 at 9:53 AM Alex Amato  wrote:
>>>>
>>>>> I've gathered a lot of feedback so far and want to make a decision by
>>>>> Friday, and begin working on related PRs next week.
>>>>>
>>>>> Please make sure that you provide your feedback before then and I will
>>>>> post the final decisions made to this thread Friday afternoon.
>>>>>
>>>>>
>>>>> On Thu, Apr 5, 2018 at 1:38 AM Ismaël Mejía  wrote:
>>>>>
>>>>>> Nice, I created a short link so people can refer to it easily in
>>>>>> future discussions, website, etc.
>>>>>>
>>>>>> https://s.apache.org/beam-fn-api-metrics
>>>>>>
>>>>>> Thanks for sharing.
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 4, 2018 at 11:28 PM, Robert Bradshaw 
>>>>>> wrote:
>>>>>> > Thanks for the nice writeup. I added some comments.
>>>>>> >
>>>>>> > On Wed, Apr 4, 2018 at 1:53 PM Alex Amato 
>>>>>> wrote:
>>>>>> >>
>>>>>> >> Hello beam community,
>>>>>> >>
>>>>>> >> Thank you everyone for your initial feedback on this proposal so
>>>>>> far. I
>>>>>> >> have made some revisions based on the feedback. There were some
>>>>>> larger
>>>>>> >> questions asking about alternatives. For each of these I have
>>>>>> added a
>>>>>> >> section tagged with [Alternatives] and discussed my recommendation
>>>>>> as well
>>>>>> >> as as few other choices we considered.
>>>>>> >>
>>>>>> >> I would appreciate more feedback on the revised proposal. Please
>>>>>> take
>>>>>> >> another look and let me know
>>>>>> >>
>>>>>> >>
>>>>>> https://docs.google.com/document/d/1MtBZYV7NAcfbwyy9Op8STeFNBxtljxgy69FkHMvhTMA/edit
>>>>>> >>
>>>>>> >> Etienne, I would appreciate it if you could please take another
>>>>>> look after
>>>>>> >> the revisions I have made as well.
>>>>>> >>
>>>>>> >> Thanks again,
>>>>>> >> Alex
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>


Re: Updated [Proposal] Apache Beam Fn API : Defining and adding SDK Metrics

2018-04-11 Thread Ben Chambers
The metric api is designed to prevent user defined metric types based on
the fact they just weren't used enough to justify support.

Is there a reason we are bringing that complexity back? Shouldn't we just
need the ability for the standard set plus any special system metrivs?

On Wed, Apr 11, 2018, 5:43 PM Robert Bradshaw  wrote:

> Thanks. I think this has simplified things.
>
> One thing that has occurred to me is that we're conflating the idea of
> custom metrics and custom metric types. I would propose the MetricSpec
> field be augmented with an additional field "type" which is a urn
> specifying the type of metric it is (i.e. the contents of its payload, as
> well as the form of aggregation). Summing or maxing over ints would be a
> typical example. Though we could pursue making this opaque to the runner in
> the long run, that's a more speculative (and difficult) feature to tackle.
> This would allow the runner to at least aggregate and report/return to the
> SDK metrics that it did not itself understand the semantic meaning of. (It
> would probably simplify much of the specialization in the runner itself for
> metrics that it *did* understand as well.)
>
> In addition, rather than having UserMetricOfTypeX for every type X one
> would have a single URN for UserMetric and it spec would designate the type
> and payload designate the (qualified) name.
>
> - Robert
>
>
>
> On Wed, Apr 11, 2018 at 5:12 PM Alex Amato  wrote:
>
>> Thank you everyone for your feedback so far.
>> I have made a revision today which is to make all metrics refer to a
>> primary entity, so I have restructured some of the protos a little bit.
>>
>> The point of this change was to futureproof the possibility of allowing
>> custom user metrics, with custom aggregation functions for its metric
>> updates.
>> Now that each metric has an aggregation_entity associated with it (e.g.
>> PCollection, PTransform), we can design an approach which forwards the
>> opaque bytes metric updates, without deserializing them. These are
>> forwarded to user provided code which then would deserialize the metric
>> update payloads and perform the custom aggregations.
>>
>> I think it has also simplified some of the URN metric protos, as they do
>> not need to keep track of ptransform names inside themselves now. The
>> result is simpler structures, for the metrics as the entities are pulled
>> outside of the metric.
>>
>> I have mentioned this in the doc now, and wanted to draw attention to
>> this particular revision.
>>
>>
>>
>> On Tue, Apr 10, 2018 at 9:53 AM Alex Amato  wrote:
>>
>>> I've gathered a lot of feedback so far and want to make a decision by
>>> Friday, and begin working on related PRs next week.
>>>
>>> Please make sure that you provide your feedback before then and I will
>>> post the final decisions made to this thread Friday afternoon.
>>>
>>>
>>> On Thu, Apr 5, 2018 at 1:38 AM Ismaël Mejía  wrote:
>>>
 Nice, I created a short link so people can refer to it easily in
 future discussions, website, etc.

 https://s.apache.org/beam-fn-api-metrics

 Thanks for sharing.


 On Wed, Apr 4, 2018 at 11:28 PM, Robert Bradshaw 
 wrote:
 > Thanks for the nice writeup. I added some comments.
 >
 > On Wed, Apr 4, 2018 at 1:53 PM Alex Amato  wrote:
 >>
 >> Hello beam community,
 >>
 >> Thank you everyone for your initial feedback on this proposal so
 far. I
 >> have made some revisions based on the feedback. There were some
 larger
 >> questions asking about alternatives. For each of these I have added a
 >> section tagged with [Alternatives] and discussed my recommendation
 as well
 >> as as few other choices we considered.
 >>
 >> I would appreciate more feedback on the revised proposal. Please take
 >> another look and let me know
 >>
 >>
 https://docs.google.com/document/d/1MtBZYV7NAcfbwyy9Op8STeFNBxtljxgy69FkHMvhTMA/edit
 >>
 >> Etienne, I would appreciate it if you could please take another look
 after
 >> the revisions I have made as well.
 >>
 >> Thanks again,
 >> Alex
 >>
 >

>>>


Re: org.apache.beam.sdk.values.TupleTag#genId and stacktraces?

2018-04-10 Thread Ben Chambers
I believe it doesn't need to be stable across refactoring, only across all
workers executing a specific version of the code. Specifically, it is used
as follows:

1. Create a pipeline on the user's machine. It walks the stack until the
static initializer block, which provides an ID.
2. Send the pipeline to many worker machines.
3. Each worker machine walks the stack until the static initializer block
(on the same version of the code), receiving the same ID.

This ensures that the tupletag is the same on all the workers, as well as
on the user's machine, which is critical since it used as an identifier
across these machines.

Assigning a UUID would work if all of the machines agreed on the same tuple
ID, which could be accomplished with serialization. Serialization, however,
doesn't work well with static initializers, since those will have been
called to initialize the class at load time.

On Tue, Apr 10, 2018 at 10:27 AM Romain Manni-Bucau 
wrote:

> Well issue is more about all the existing tests currently.
>
> Out of curiosity: how walking the stack is stable since the stack can
> change? Stop condition is the static block of a class which can use method
> so refactoring and therefore is not stable. Should it be deprecated?
>
>
> Le 10 avr. 2018 19:17, "Robert Bradshaw"  a écrit :
>
> If it's too slow perhaps you could use the constructor where you pass an
> explicit id (though in my experience walking the stack isn't that slow).
>
> On Tue, Apr 10, 2018 at 10:09 AM Romain Manni-Bucau 
> wrote:
>
>> Oops cross post sorry.
>>
>> Issue i hit on this thread is it is used a lot in tests abd it slows down
>> tests for nothing like with generatesequence ones
>>
>> Le 10 avr. 2018 19:00, "Romain Manni-Bucau"  a
>> écrit :
>>
>>>
>>>
>>> Le 10 avr. 2018 18:40, "Robert Bradshaw"  a écrit :
>>>
>>> These values should be, inasmuch as possible, stable across VMs. How
>>> slow is slow? Doesn't this happen only once per VM startup?
>>>
>>>
>>> Once per jvm and idea launches a jvm per test and the daemon does save
>>> enough time, you still go through the whole project and check all upstream
>>> deps it seems.
>>>
>>> It is <1s with maven vs 5-6s with gradle.
>>>
>>>
>>> On Tue, Apr 10, 2018 at 9:33 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
 Hi

 does org.apache.beam.sdk.values.TupleTag#genId need to get the
 stacktrace or can we use any id generator (like
 UUID.random().toString())? Using traces is quite slow under load and
 environments where the root stack is not just the "next" level so
 skipping it would be nice.

 Romain Manni-Bucau
 @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book

>>>
>>>
>


Re: About the Gauge metric API

2018-04-07 Thread Ben Chambers
Great summary overall. A few small things to add, along with a note that
more examples of the intended is for each metric/aggregation may be helpful.

It is worth looking at what existing metric systems provide. Specifically,
two things to consider:

1. Is scoping implict / automatic or explicit. In some systems a metric can
be declared as having one or more labels, and reporting it requires
providing values for those labels. This allows user defined labelling, and
also simplifies scoping to just ensuring the right values are available for
use as labels.

2. There is only one runner that supports logical metrics, and no standard
metrics backend support those. Perhaps metrics in Beam should focus on the
standardly available set? Logical makes the most sense for system metrics
(such as elements in a collection) and don't need to use the standard
mechanisms.

This would simultaneously simplify the metrics system and bring it more in
line across runners and metric backends.



On Sat, Apr 7, 2018, 11:37 AM Andrea Foegler  wrote:

> Hi folks,
>
> A lot of great / interesting use cases have come up on this thread!
>
> I would like to advocate for this discussion to include scoping and
> aggregation in a way that is consistent across types of metrics: Gauge,
> Cumulative, etc. In my head, Gauge = "instantaneous reading" and Cumulative
> = "incrementally changing".  There might be some implication on meaningful
> aggregations there, but feels like the specification and handling of scope
> and aggregations could be shared.
>
> The existing Cumulative Metric implementation may provide a few challenges
> to this, but also a good starting point.  Below I've summarized my
> interpretation of Cumulative Metric behavior and some considerations around
> flexibility.
>
> Cumulative Metrics
> * Programming model: Implicitly scoped to reporting Transform;
> * Reporting: Implicitly scoped to Transform and Bundle;
> * Monitoring: Explicitly scoped to Transform
>
> Re: accumulation
> The Bundle scoping at runtime is only strictly required for handling
> "Bundle failed" semantics.  But it also conveniently handles the
> distributed nature of the computation. Regardless of the "logical" nature
> of these metrics, I don't know that external monitoring systems support a
> notion of implicit aggregation across reporting entities.  I believe they
> would require a per-reporting-entity id/scope of some sort.  Bundle id
> works.  A worker identifier would work too.  Then the monitoring system
> would allow aggregations over those per-bundle or per-worker metrics.  But
> the reported metric values must have been received segmented by reporter.
> **Important note: the cardinality of Bundle is likely far to large to
> handle as a scope explicitly.  As Robert mentioned, the same is true for
> "key".  This could be an argument for making certain scopes only available
> to Distributions or at reporting time for internal aggregations.
>
> Re: scoping / identification
> There's an interesting consideration moving between the scoping of the
> programming language and the scoping of the intended metric.  By implicitly
> scoping Cumulative metrics to a ParDo, the scope of the metric and the
> scope of the variable are in sync.  By scoping to anything else, there's an
> implication that 2 different Metric variables parameterized by the same
> name/ID actually refer to the same Metric. So if a user wanted an
> rpc_error_count metric that accumulated RPC errors between the 3 different
> ParDos that make RPC calls, Cumulative Metrics can't currently support
> that.  If we plan to support something like that for Gauge, consistency
> would nice.
>
> Aside:  The current usage of Gauge in the SDK looks to be for a Globally
> observable value where the user would like to see the value exported, but
> there's no clear owner.  So every entity reports and the newest value
> wins.  Just mentioning so that that use case is considered.
>
> Not a proposal, just a high level organization of metric parameters that
> might be useful for discussion. (I'm not sure how to account for local
> aggregation here.  Kind of seems like any arbitrary code block could be
> supported there.)
> Metric Name: 
> Metric Type: Gauge, Cumulative, Distribution...
> Scope: Transform, Global, Worker... (set of?)
> Data Type: Integer, Double, String...
> Aggregation: Sum, Latest, Max...
> Metric ID:   + (s)
>
> Some potentially interesting questions:
> 1. How should the user specify the parameters of the metric?  What
> parameter combos are supported?
> 2. Should the SDK support different Metric variables contributing to the
> same Metric value? If so, how does the SDK communicate that?
> 3. 

Re: About the Gauge metric API

2018-04-06 Thread Ben Chambers
Generally strong +1 to everything Bill said. I would suggest though that
the per-worker segmentation might be specified using some more general
tagging/labeling API. For instance, all of the following seem like
reasonable uses to support:

1. Gauge that is tagged with worker to get per-worker segmentation (such as
queue size, memory usage, etc.)
2. Gauge that is tagged with the "key" being processed. Would be useful for
things like how much data is buffered, where are watermark holds for each
key, etc. If processing is partitioned by key, this is strictly more
granular than per-worker.
3. Counter tagged with the "key" being processed. Would be useful for time
spent processing each key, etc.

On Fri, Apr 6, 2018 at 11:39 AM Bill Neubauer  wrote:

> Thanks for unraveling those themes, Pablo!
>
> 1. Seems reasonable in light of behaviors metrics backends support.
> 2. Those same backends support histogramming of data, so having integer
> types is very useful.
> 3. I believe that is the case, for the reasons I mentioned earlier, Gauges
> should only clobber previously values reported by the same entity. Two
> workers with the same gauge should not be overwriting each other's values,
> only their own. This implies per-worker segmentation.
>
>
> On Fri, Apr 6, 2018 at 11:35 AM Pablo Estrada  wrote:
>
>> Nobody wants to get rid of Gauges. I see that we have three separate
>> themes being discussed here, and I think it's useful to point them out and
>> address them independently:
>>
>> 1. Whether Gauges should change to hold string values.
>> 2. If Gauges are to support string values, whether Gauges should also
>> continue to have an int API.
>> 3. Whether Beam should support some sort of label / tag / worker-id for
>> aggregation of Gauges (maybe other metrics?)
>>
>> -P.
>>
>> On Fri, Apr 6, 2018 at 11:21 AM Ben Chambers 
>> wrote:
>>
>>> Gauges are incredibly useful for exposing the current state of the
>>> system. For instance, number of elements in a queue, current memory usage,
>>> number of RPCs in flight, etc. As mentioned above, these concepts exist in
>>> numerous systems for monitoring distributed environments, including
>>> Stackdriver Monitoring. The key to making them work is the addition of
>>> labels or tags, which as an aside are also useful for *all* metric types,
>>> not just Gauges.
>>>
>>> If Beam gets rid of Gauges, how would we go about exporting "current"
>>> values like memory usage, RPCs in flight, etc.?
>>>
>>> -- Ben
>>>
>>> On Fri, Apr 6, 2018 at 11:13 AM Kenneth Knowles  wrote:
>>>
>>>> Just naively - the use cases that Gauge addresses seem relevant, and
>>>> the information seems feasible to gather and present. The bit that doesn't
>>>> seem to make sense is aggregating gauges by clobbering each other. So I
>>>> think that's just +1 Ben?
>>>>
>>>> On Fri, Apr 6, 2018 at 10:26 AM Raghu Angadi 
>>>> wrote:
>>>>
>>>>> I am not opposed to removing other data types, though they are extra
>>>>> convenience for user.
>>>>>
>>>>> In Scott's example above, if the metric is a counter, what are the
>>>>> guarantees provided? E.g. would it match the global count using GBK? If
>>>>> yes, then gauges (especially per-key gauges) can be very useful too (e.g.
>>>>> backlog for each Kafka partition/split).
>>>>>
>>>>> On Fri, Apr 6, 2018 at 10:01 AM Robert Bradshaw 
>>>>> wrote:
>>>>>
>>>>>> A String API makes it clear(er) that the values will not be
>>>>>> aggregated in any way across workers. I don't think retaining both APIs
>>>>>> (except for possibly some short migration period) worthwhile. On another
>>>>>> note, I still find the distributed gague API to be a bit odd in general.
>>>>>>
>>>>>> On Fri, Apr 6, 2018 at 9:46 AM Raghu Angadi 
>>>>>> wrote:
>>>>>>
>>>>>>> I would be in favor of replacing the existing Gauge.set(long) API
>>>>>>>> with the String version and removing the old one. This would be a 
>>>>>>>> breaking
>>>>>>>> change. However this is a relatively new API and is still marked
>>>>>>>> @Experimental. Keeping the old API would retain the potential 
>>>>>>>> confusion.
>>>>>>>> It&

Re: About the Gauge metric API

2018-04-06 Thread Ben Chambers
Gauges are incredibly useful for exposing the current state of the system.
For instance, number of elements in a queue, current memory usage, number
of RPCs in flight, etc. As mentioned above, these concepts exist in
numerous systems for monitoring distributed environments, including
Stackdriver Monitoring. The key to making them work is the addition of
labels or tags, which as an aside are also useful for *all* metric types,
not just Gauges.

If Beam gets rid of Gauges, how would we go about exporting "current"
values like memory usage, RPCs in flight, etc.?

-- Ben


On Fri, Apr 6, 2018 at 11:13 AM Kenneth Knowles  wrote:

> Just naively - the use cases that Gauge addresses seem relevant, and the
> information seems feasible to gather and present. The bit that doesn't seem
> to make sense is aggregating gauges by clobbering each other. So I think
> that's just +1 Ben?
>
> On Fri, Apr 6, 2018 at 10:26 AM Raghu Angadi  wrote:
>
>> I am not opposed to removing other data types, though they are extra
>> convenience for user.
>>
>> In Scott's example above, if the metric is a counter, what are the
>> guarantees provided? E.g. would it match the global count using GBK? If
>> yes, then gauges (especially per-key gauges) can be very useful too (e.g.
>> backlog for each Kafka partition/split).
>>
>> On Fri, Apr 6, 2018 at 10:01 AM Robert Bradshaw 
>> wrote:
>>
>>> A String API makes it clear(er) that the values will not be aggregated
>>> in any way across workers. I don't think retaining both APIs (except for
>>> possibly some short migration period) worthwhile. On another note, I still
>>> find the distributed gague API to be a bit odd in general.
>>>
>>> On Fri, Apr 6, 2018 at 9:46 AM Raghu Angadi  wrote:
>>>
>>>> I would be in favor of replacing the existing Gauge.set(long) API with
>>>>> the String version and removing the old one. This would be a breaking
>>>>> change. However this is a relatively new API and is still marked
>>>>> @Experimental. Keeping the old API would retain the potential confusion.
>>>>> It's better to simplify the API surface: having two APIs makes it less
>>>>> clear which one users should choose.
>>>>
>>>>
>>>> Supporting additional data types sounds good. But the above states
>>>> string API will replace the existing API. I do not see how string API makes
>>>> the semantics more clear.  Semantically both are same to the user.
>>>>
>>>> On Fri, Apr 6, 2018 at 9:31 AM Pablo Estrada 
>>>> wrote:
>>>>
>>>>> Hi Ben : D
>>>>>
>>>>> Sure, that's reasonable. And perhaps I started the discussion in the
>>>>> wrong direction. I'm not questioning the utility of Gauge metrics.
>>>>>
>>>>> What I'm saying is that Beam only supports integers,, but Gauges are
>>>>> aggregated by dropping old values depending on their update times; so it
>>>>> might be desirable to not restrict the data type to just integers.
>>>>>
>>>>> -P.
>>>>>
>>>>> On Fri, Apr 6, 2018 at 9:19 AM Ben Chambers 
>>>>> wrote:
>>>>>
>>>>>> See for instance how gauge metrics are handled in Prometheus, Datadog
>>>>>> and Stackdriver monitoring. Gauges are perfect for use in distributed
>>>>>> systems, they just need to be properly labeled. Perhaps we should apply a
>>>>>> default tag or allow users to specify one.
>>>>>>
>>>>>> On Fri, Apr 6, 2018, 9:14 AM Ben Chambers 
>>>>>> wrote:
>>>>>>
>>>>>>> Some metrics backend label the value, for instance with the worker
>>>>>>> that sent it. Then the aggregation is latest per label. This makes it
>>>>>>> useful for holding values such as "memory usage" that need to hold 
>>>>>>> current
>>>>>>> value.
>>>>>>>
>>>>>>> On Fri, Apr 6, 2018, 9:00 AM Scott Wegner 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +1 on the proposal to support a "String" gauge.
>>>>>>>>
>>>>>>>> To expand a bit, the current API doesn't make it clear that the
>>>>>>>> gauge value is based on local state. If a runner chooses to 
>>>>>>>> parallelize a
>>>>&g

Re: About the Gauge metric API

2018-04-06 Thread Ben Chambers
See for instance how gauge metrics are handled in Prometheus, Datadog and
Stackdriver monitoring. Gauges are perfect for use in distributed systems,
they just need to be properly labeled. Perhaps we should apply a default
tag or allow users to specify one.

On Fri, Apr 6, 2018, 9:14 AM Ben Chambers  wrote:

> Some metrics backend label the value, for instance with the worker that
> sent it. Then the aggregation is latest per label. This makes it useful for
> holding values such as "memory usage" that need to hold current value.
>
> On Fri, Apr 6, 2018, 9:00 AM Scott Wegner  wrote:
>
>> +1 on the proposal to support a "String" gauge.
>>
>> To expand a bit, the current API doesn't make it clear that the gauge
>> value is based on local state. If a runner chooses to parallelize a DoFn
>> across many workers, each worker will have its own local Gauge metric and
>> its updates will overwrite other values. For example, from the API it looks
>> like you could use a gauge to implement your own element count metric:
>>
>> long count = 0;
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>>   myGauge.set(++count);
>>   c.output(c.element());
>> }
>>
>> This looks correct, but each worker has their own local 'count' field,
>> and gauge metric updates from parallel workers will overwrite each other
>> rather than get aggregated. So the final value would be "the number of
>> elements processed on one of the workers". (The correct implementation uses
>> a Counter metric).
>>
>> I would be in favor of replacing the existing Gauge.set(long) API with
>> the String version and removing the old one. This would be a breaking
>> change. However this is a relatively new API and is still marked
>> @Experimental. Keeping the old API would retain the potential confusion.
>> It's better to simplify the API surface: having two APIs makes it less
>> clear which one users should choose.
>>
>> On Fri, Apr 6, 2018 at 8:28 AM Pablo Estrada  wrote:
>>
>>> Hello all,
>>> As I was working on adding support for Gauges in Dataflow, some noted
>>> that Gauge is a fairly unusual kind of metric for a distributed
>>> environment, since many workers will report different values and stomp on
>>> each other's all the time.
>>>
>>> We also looked at Flink and Dropwizard Gauge metrics [1][2], and we
>>> found that these use generics, and Flink explicitly mentions that a
>>> toString implementation is required[3].
>>>
>>> With that in mind, I'm thinking that it might make sense to 1) expand
>>> Gauge to support string values (keep int-based API for backwards
>>> compatibility), and migrate it to use string behind the covers.
>>>
>>> What does everyone think about this?
>>>
>>> Best
>>> -P.
>>>
>>> 1 -
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#metric-types
>>> 2 - https://metrics.dropwizard.io/3.1.0/manual/core/#gauges
>>> 3 -
>>> https://github.com/apache/flink/blob/master/docs/monitoring/metrics.md#gauge
>>> JIRA issue for Gauge metrics -
>>> https://issues.apache.org/jira/browse/BEAM-1616
>>> --
>>> Got feedback? go/pabloem-feedback
>>> <https://goto.google.com/pabloem-feedback>
>>>
>> --
>>
>>
>> Got feedback? http://go/swegner-feedback
>>
>


Re: About the Gauge metric API

2018-04-06 Thread Ben Chambers
Some metrics backend label the value, for instance with the worker that
sent it. Then the aggregation is latest per label. This makes it useful for
holding values such as "memory usage" that need to hold current value.

On Fri, Apr 6, 2018, 9:00 AM Scott Wegner  wrote:

> +1 on the proposal to support a "String" gauge.
>
> To expand a bit, the current API doesn't make it clear that the gauge
> value is based on local state. If a runner chooses to parallelize a DoFn
> across many workers, each worker will have its own local Gauge metric and
> its updates will overwrite other values. For example, from the API it looks
> like you could use a gauge to implement your own element count metric:
>
> long count = 0;
> @ProcessElement
> public void processElement(ProcessContext c) {
>   myGauge.set(++count);
>   c.output(c.element());
> }
>
> This looks correct, but each worker has their own local 'count' field, and
> gauge metric updates from parallel workers will overwrite each other rather
> than get aggregated. So the final value would be "the number of elements
> processed on one of the workers". (The correct implementation uses a
> Counter metric).
>
> I would be in favor of replacing the existing Gauge.set(long) API with the
> String version and removing the old one. This would be a breaking change.
> However this is a relatively new API and is still marked @Experimental.
> Keeping the old API would retain the potential confusion. It's better to
> simplify the API surface: having two APIs makes it less clear which one
> users should choose.
>
> On Fri, Apr 6, 2018 at 8:28 AM Pablo Estrada  wrote:
>
>> Hello all,
>> As I was working on adding support for Gauges in Dataflow, some noted
>> that Gauge is a fairly unusual kind of metric for a distributed
>> environment, since many workers will report different values and stomp on
>> each other's all the time.
>>
>> We also looked at Flink and Dropwizard Gauge metrics [1][2], and we found
>> that these use generics, and Flink explicitly mentions that a toString
>> implementation is required[3].
>>
>> With that in mind, I'm thinking that it might make sense to 1) expand
>> Gauge to support string values (keep int-based API for backwards
>> compatibility), and migrate it to use string behind the covers.
>>
>> What does everyone think about this?
>>
>> Best
>> -P.
>>
>> 1 -
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#metric-types
>> 2 - https://metrics.dropwizard.io/3.1.0/manual/core/#gauges
>> 3 -
>> https://github.com/apache/flink/blob/master/docs/monitoring/metrics.md#gauge
>> JIRA issue for Gauge metrics -
>> https://issues.apache.org/jira/browse/BEAM-1616
>> --
>> Got feedback? go/pabloem-feedback
>> 
>>
> --
>
>
> Got feedback? http://go/swegner-feedback
>


Re: (java) stream & beam?

2018-03-13 Thread Ben Chambers
The CombineFn API has three types parameters (input, accumulator, and
output) and methods that approximately correspond to those parts of the
collector

CombineFn.createAccumulator = supplier
CombineFn.addInput = accumulator
CombineFn.mergeAccumlator = combiner
CombineFn.extractOutput = finisher

That said, the Collector API has some minimal, cosmetic differences, such
as CombineFn.addInput may either mutate the accumulator or return it. The
Collector accumulator method is a BiConsumer, meaning it must modify.

On Tue, Mar 13, 2018 at 11:39 AM Romain Manni-Bucau 
wrote:

> Misses the collect split in 3 (supplier, combiner, aggregator) but
> globally agree.
>
>  I d just take java stream, remove "client" method or make them big data
> if possible, ensure all hooks are serializable to avoid hacks and add an
> unwrap to be able to access the pipeline in case we need a very custom
> thing and we are done for me.
>
> Le 13 mars 2018 19:26, "Ben Chambers"  a écrit :
>
>> I think the existing rationale (not introducing lots of special fluent
>> methods) makes sense. However, if we look at the Java Stream API, we
>> probably wouldn't need to introduce *a lot* of fluent builders to get most
>> of the functionality. Specifically, if we focus on map, flatMap, and
>> collect from the Stream API, and a few extensions, we get something like:
>>
>> * collection.map(DoFn) for applying aParDo
>> * collection.map(SerialiazableFn) for Java8 lambda shorthand
>> * collection.flatMap(SerialiazbleFn) for Java8 lambda shorthand
>> * collection.collect(CombineFn) for applying a CombineFn
>> * collection.apply(PTransform) for applying a composite transform. note
>> that PTransforms could also use serializable lambdas for definition.
>>
>> (Note that GroupByKey doesn't even show up here -- it could, but that
>> could also be way of wrapping a collector, as in the Java8
>> Collectors.groupyingBy [1]
>>
>> With this, we could write code like:
>>
>> collection
>>   .map(myDoFn)
>>   .map((s) -> s.toString())
>>   .collect(new IntegerCombineFn())
>>   .apply(GroupByKey.of());
>>
>> That said, my two concerns are:
>> (1) having two similar but different Java APIs. If we have more idiomatic
>> way of writing pipelines in Java, we should make that the standard.
>> Otherwise, users will be confused by seeing "Beam" examples written in
>> multiple, incompatible syntaxes.
>> (2) making sure the above is truly idiomatic Java and that it doesn't any
>> conflicts with the cross-language Beam programming model. I don't think it
>> does. We have (I believ) chosen to make the Python and Go SDKs idiomatic
>> for those languages where possible.
>>
>> If this work is focused on making the Java SDK more idiomatic (and thus
>> easier for Java users to learn), it seems like a good thing. We should just
>> make sure it doesn't scope-creep into defining an entirely new DSL or SDK.
>>
>> [1]
>> https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html#groupingBy-java.util.function.Function-
>>
>> On Tue, Mar 13, 2018 at 11:06 AM Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>> Yep
>>>
>>> I know the rational and it makes sense but it also increases the
>>> entering steps for users and is not that smooth in ides, in particular for
>>> custom code.
>>>
>>> So I really think it makes sense to build an user friendly api on top of
>>> beam core dev one.
>>>
>>>
>>> Le 13 mars 2018 18:35, "Aljoscha Krettek"  a
>>> écrit :
>>>
>>>>
>>>> https://beam.apache.org/blog/2016/05/27/where-is-my-pcollection-dot-map.html
>>>>
>>>> On 11. Mar 2018, at 22:21, Romain Manni-Bucau 
>>>> wrote:
>>>>
>>>>
>>>>
>>>> Le 12 mars 2018 00:16, "Reuven Lax"  a écrit :
>>>>
>>>> I think it would be interesting to see what a Java stream-based API
>>>> would look like. As I mentioned elsewhere, we are not limited to having
>>>> only one API for Beam.
>>>>
>>>> If I remember correctly, a Java stream API was considered for Dataflow
>>>> back at the very beginning. I don't completely remember why it was
>>>> rejected, but I suspect at least part of the reason might have been that
>>>> Java streams were considered too new and untested back then.
>>>>
>>>>
>>>> Coders are broken - typevariables dont have bounds except 

Re: (java) stream & beam?

2018-03-13 Thread Ben Chambers
I think the existing rationale (not introducing lots of special fluent
methods) makes sense. However, if we look at the Java Stream API, we
probably wouldn't need to introduce *a lot* of fluent builders to get most
of the functionality. Specifically, if we focus on map, flatMap, and
collect from the Stream API, and a few extensions, we get something like:

* collection.map(DoFn) for applying aParDo
* collection.map(SerialiazableFn) for Java8 lambda shorthand
* collection.flatMap(SerialiazbleFn) for Java8 lambda shorthand
* collection.collect(CombineFn) for applying a CombineFn
* collection.apply(PTransform) for applying a composite transform. note
that PTransforms could also use serializable lambdas for definition.

(Note that GroupByKey doesn't even show up here -- it could, but that could
also be way of wrapping a collector, as in the Java8 Collectors.groupyingBy
[1]

With this, we could write code like:

collection
  .map(myDoFn)
  .map((s) -> s.toString())
  .collect(new IntegerCombineFn())
  .apply(GroupByKey.of());

That said, my two concerns are:
(1) having two similar but different Java APIs. If we have more idiomatic
way of writing pipelines in Java, we should make that the standard.
Otherwise, users will be confused by seeing "Beam" examples written in
multiple, incompatible syntaxes.
(2) making sure the above is truly idiomatic Java and that it doesn't any
conflicts with the cross-language Beam programming model. I don't think it
does. We have (I believ) chosen to make the Python and Go SDKs idiomatic
for those languages where possible.

If this work is focused on making the Java SDK more idiomatic (and thus
easier for Java users to learn), it seems like a good thing. We should just
make sure it doesn't scope-creep into defining an entirely new DSL or SDK.

[1]
https://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html#groupingBy-java.util.function.Function-

On Tue, Mar 13, 2018 at 11:06 AM Romain Manni-Bucau 
wrote:

> Yep
>
> I know the rational and it makes sense but it also increases the entering
> steps for users and is not that smooth in ides, in particular for custom
> code.
>
> So I really think it makes sense to build an user friendly api on top of
> beam core dev one.
>
>
> Le 13 mars 2018 18:35, "Aljoscha Krettek"  a écrit :
>
>>
>> https://beam.apache.org/blog/2016/05/27/where-is-my-pcollection-dot-map.html
>>
>> On 11. Mar 2018, at 22:21, Romain Manni-Bucau 
>> wrote:
>>
>>
>>
>> Le 12 mars 2018 00:16, "Reuven Lax"  a écrit :
>>
>> I think it would be interesting to see what a Java stream-based API would
>> look like. As I mentioned elsewhere, we are not limited to having only one
>> API for Beam.
>>
>> If I remember correctly, a Java stream API was considered for Dataflow
>> back at the very beginning. I don't completely remember why it was
>> rejected, but I suspect at least part of the reason might have been that
>> Java streams were considered too new and untested back then.
>>
>>
>> Coders are broken - typevariables dont have bounds except object - and
>> reducers are not trivial to impl generally I guess.
>>
>> However being close of this api can help a lot so +1 to try to have a
>> java dsl on top of current api. Would also be neat to integrate it with
>> completionstage :).
>>
>>
>>
>> Reuven
>>
>>
>> On Sun, Mar 11, 2018 at 2:29 PM Romain Manni-Bucau 
>> wrote:
>>
>>>
>>>
>>> Le 11 mars 2018 21:18, "Jean-Baptiste Onofré"  a
>>> écrit :
>>>
>>> Hi Romain,
>>>
>>> I remember we have discussed about the way to express pipeline while ago.
>>>
>>> I was fan of a "DSL" compared to the one we have in Camel: instead of
>>> using apply(), use a dedicated form (like .map(), .reduce(), etc, AFAIR,
>>> it's the approach in flume).
>>> However, we agreed that apply() syntax gives a more flexible approach.
>>>
>>> Using Java Stream is interesting but I'm afraid we would have the same
>>> issue as the one we identified discussing "fluent Java SDK". However, we
>>> can have a Stream API DSL on top of the SDK IMHO.
>>>
>>>
>>> Agree and a beam stream interface (copying jdk api but making lambda
>>> serializable to avoid the cast need).
>>>
>>> On my side i think it enables user to discover the api. If you check my
>>> poc impl you quickly see the steps needed to do simple things like a map
>>> which is a first citizen.
>>>
>>> Also curious if we could impl reduce with pipeline result = get an
>>> output of a batch from the runner (client) jvm. I see how to do it for
>>> longs - with metrics - but not for collect().
>>>
>>>
>>> Regards
>>> JB
>>>
>>>
>>> On 11/03/2018 19:46, Romain Manni-Bucau wrote:
>>>
 Hi guys,

 don't know if you already experienced using java Stream API as a
 replacement for pipeline API but did some tests:
 https://github.com/rmannibucau/jbeam

 It is far to be complete but already shows where it fails (beam doesn't
 have a way to reduce in the caller machine for instance, coder handling is
 not that trivial, lambda 

Re: @TearDown guarantees

2018-02-18 Thread Ben Chambers
Are you sure that focusing on the cleanup of specific DoFn's is
appropriate? Many cases where cleanup is necessary, it is around an entire
composite PTransform. I think there have been discussions/proposals around
a more methodical "cleanup" option, but those haven't been implemented, to
the best of my knowledge.

For instance, consider the steps of a FileIO:
1. Write to a bunch (N shards) of temporary files
2. When all temporary files are complete, attempt to do a bulk copy to put
them in the final destination.
3. Cleanup all the temporary files.

(This is often desirable because it minimizes the chance of seeing
partial/incomplete results in the final destination).

In the above, you'd want step 1 to execute on many workers, likely using a
ParDo (say N different workers).
The move step should only happen once, so on one worker. This means it will
be a different DoFn, likely with some stuff done to ensure it runs on one
worker.

In such a case, cleanup / @TearDown of the DoFn is not enough. We need an
API for a PTransform to schedule some cleanup work for when the transform
is "done". In batch this is relatively straightforward, but doesn't exist.
This is the source of some problems, such as BigQuery sink leaving files
around that have failed to import into BigQuery.

In streaming this is less straightforward -- do you want to wait until the
end of the pipeline? Or do you want to wait until the end of the window? In
practice, you just want to wait until you know nobody will need the
resource anymore.

This led to some discussions around a "cleanup" API, where you could have a
transform that output resource objects. Each resource object would have
logic for cleaning it up. And there would be something that indicated what
parts of the pipeline needed that resource, and what kind of temporal
lifetime those objects had. As soon as that part of the pipeline had
advanced far enough that it would no longer need the resources, they would
get cleaned up. This can be done at pipeline shutdown, or incrementally
during a streaming pipeline, etc.

Would something like this be a better fit for your use case? If not, why is
handling teardown within a single DoFn sufficient?

On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau 
wrote:

> Yes 1M. Lets try to explain you simplifying the overall execution. Each
> instance - one fn so likely in a thread of a worker - has its lifecycle.
> Caricaturally: "new" and garbage collection.
>
> In practise, new is often an unsafe allocate (deserialization) but it
> doesnt matter here.
>
> What i want is any "new" to have a following setup before any process or
> stattbundle and the last time beam has the instance before it is gc-ed and
> after last finishbundle it calls teardown.
>
> It is as simple as it.
> This way no need to comibe fn in a way making a fn not self contained to
> implement basic transforms.
>
> Le 18 févr. 2018 20:07, "Reuven Lax"  a écrit :
>
>>
>>
>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>>
>>>
>>> Le 18 févr. 2018 19:28, "Ben Chambers"  a écrit :
>>>
>>> It feels like his thread may be a bit off-track. Rather than focusing on
>>> the semantics of the existing methods -- which have been noted to be meet
>>> many existing use cases -- it would be helpful to focus on more on the
>>> reason you are looking for something with different semantics.
>>>
>>> Some possibilities (I'm not sure which one you are trying to do):
>>>
>>> 1. Clean-up some external, global resource, that was initialized once
>>> during the startup of the pipeline. If this is the case, how are you
>>> ensuring it was really only initialized once (and not once per worker, per
>>> thread, per instance, etc.)? How do you know when the pipeline should
>>> release it? If the answer is "when it reaches step X", then what about a
>>> streaming pipeline?
>>>
>>>
>>> When the dofn is no more needed logically ie when the batch is done or
>>> stream is stopped (manually or by a jvm shutdown)
>>>
>>
>> I'm really not following what this means.
>>
>> Let's say that a pipeline is running 1000 workers, and each worker is
>> running 1000 threads (each running a copy of the same DoFn). How many
>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
>> you want it called? When the entire pipeline is shut down? When an
>> individual worker is about to shut down (which may be temporary - may be
>> about to start back up)? Something else?
>>
>>
>>
>>>
>>>
>&

Re: @TearDown guarantees

2018-02-18 Thread Ben Chambers
It feels like his thread may be a bit off-track. Rather than focusing on
the semantics of the existing methods -- which have been noted to be meet
many existing use cases -- it would be helpful to focus on more on the
reason you are looking for something with different semantics.

Some possibilities (I'm not sure which one you are trying to do):

1. Clean-up some external, global resource, that was initialized once
during the startup of the pipeline. If this is the case, how are you
ensuring it was really only initialized once (and not once per worker, per
thread, per instance, etc.)? How do you know when the pipeline should
release it? If the answer is "when it reaches step X", then what about a
streaming pipeline?

2. Finalize some resources that are used within some region of the
pipeline. While, the DoFn lifecycle methods are not a good fit for this
(they are focused on managing resources within the DoFn), you could model
this on how FileIO finalizes the files that it produced. For instance:
   a) ParDo generates "resource IDs" (or some token that stores information
about resources)
   b) "Require Deterministic Input" (to prevent retries from changing
resource IDs)
   c) ParDo that initializes the resources
   d) Pipeline segments that use the resources, and eventually output the
fact they're done
   e) "Require Deterministic Input"
   f) ParDo that frees the resources

By making the use of the resource part of the data it is possible to
"checkpoint" which resources may be in use or have been finished by using
the require deterministic input. This is important to ensuring everything
is actually cleaned up.

3. Some other use case that I may be missing? If it is this case, could you
elaborate on what you are trying to accomplish? That would help me
understand both the problems with existing options and possibly what could
be done to help.

-- Ben


On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau 
wrote:

> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov :
>
>> "Machine state" is overly low-level because many of the possible reasons
>> can happen on a perfectly fine machine.
>> If you'd like to rephrase it to "it will be called except in various
>> situations where it's logically impossible or impractical to guarantee that
>> it's called", that's fine. Or you can list some of the examples above.
>>
>
> Sounds ok to me
>
>
>>
>> The main point for the user is, you *will* see non-preventable situations
>> where it couldn't be called - it's not just intergalactic crashes - so if
>> the logic is very important (e.g. cleaning up a large amount of temporary
>> files, shutting down a large number of VMs you started etc), you have to
>> express it using one of the other methods that have stricter guarantees
>> (which obviously come at a cost, e.g. no pass-by-reference).
>>
>
> FinishBundle has the exact same guarantee sadly so not which which other
> method you speak about. Concretely if you make it really unreliable - this
> is what best effort sounds to me - then users can use it to clean anything
> but if you make it "can happen but it is unexpected and means something
> happent" then it is fine to have a manual - or auto if fancy - recovery
> procedure. This is where it makes all the difference and impacts the
> developpers, ops (all users basically).
>
>
>>
>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau 
>> wrote:
>>
>>> Agree Eugene except that "best effort" means that. It is also often used
>>> to say "at will" and this is what triggered this thread.
>>>
>>> I'm fine using "except if the machine state prevents it" but "best
>>> effort" is too open and can be very badly and wrongly perceived by users
>>> (like I did).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau  |  Blog
>>>  | Old Blog
>>>  | Github
>>>  | LinkedIn
>>>  | Book
>>> 
>>>
>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov :
>>>
 It will not be called if it's impossible to call it: in the example
 situation you have (intergalactic crash), and in a number of more common
 cases: eg in case the worker container has crashed (eg user code in a
 different thread called a C library over JNI and it segfaulted), JVM bug,
 crash due to user code OOM, in case the worker has lost network
 connectivity (then it may be called but it won't be able to do anything
 useful), in case this is running on a preemptible VM and it was preempted
 by the underlying cluster manager without notice or if the worker was too
 busy with other stuff (eg calling other Teardown functions) until the
 preemption timeout elapsed, in case the underlying hardware simply failed
 (which happens quite often at scale), and in many other conditions.

 "Best effort" is the commonly u

Re: untyped pipeline API?

2018-01-30 Thread Ben Chambers
It sounds like in your specific case you're saying that the same encoding
can be viewed by the Java type system two different ways. For instance, if
you have an object Person that is convertible to JSON using Jackson, than
that JSON encoding can be viewed as either a Person or a Map looking at the JSON fields. In that case, there needs to be some
kind of "view change" change transform to change the type of the
PCollection.

I'm not sure an untyped API would be better here. Requiring the "view
change" be explicit means we can ensure the types are compatible, and also
makes it very clear when this kind of change is desired.

Some background on Coders that may be relevant:

It might help to to think about Coders as the specification of how elements
in a PCollection are encoded if/when the runner needs to. If you are trying
to read JSON or XML records from a source, that is part of the source
transform (reading JSON or XML records) and not part of the collection
produced by the transform.

Consider further -- even if you read XML records from a source, you likely
*wouldn't* want to use an XML Coder for those records within the pipeline,
as every time the pipeline needed to serialize them you would produce much
larger amounts of data (XML is not an efficient/compact encoding). Instead,
you likely want to read XML records from the source and then encode those
within the pipeline using something more efficient. Then convert them to
something more readable but possibly less-efficient before they exit the
pipeline at a sink.

On Tue, Jan 30, 2018 at 12:23 PM Kenneth Knowles  wrote:

> Ah, this is a point that Robert brings up quite often: one reason we put
> coders on PCollections instead of doing that work in PTransforms is that
> the runner (plus SDK harness) can automatically only serialize when
> necessary. So the default in Beam is that the thing you want to happen is
> already done. There are some corner cases when you get to the portability
> framework but I am pretty sure it already works this way. If you show what
> is a PTransform and PCollection in your example it might show where we can
> fix things.
>
> On Tue, Jan 30, 2018 at 12:17 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>> Indeed,
>>
>> I'll take a stupid example to make it shorter.
>> I have a source emitting Person objects ({name:...,id:...}) serialized
>> with jackson as JSON.
>> Then my pipeline processes them with a DoFn taking a Map.
>> Here I set the coder to read json as a map.
>>
>> However a Map is not a Person so my pipeline needs an
>> intermediate step to convert one into the other and has in the design an
>> useless serialization round trip.
>>
>> If you check the chain you have: Person -> JSON -> Map ->
>> JSON -> Map whereas Person -> JSON -> Map> String> is fully enough cause there is equivalence of JSON in this example.
>>
>> In other words if an coder output is readable from another coder input,
>> the java strong typing doesn't know about it and can enforce some fake
>> steps.
>>
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau  |  Blog
>>  | Old Blog
>>  | Github
>>  | LinkedIn
>> 
>>
>> 2018-01-30 21:07 GMT+01:00 Kenneth Knowles :
>>
>>> I'm not sure I understand your question. Can you explain more?
>>>
>>> On Tue, Jan 30, 2018 at 11:50 AM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
 Hi guys,

 just encountered an issue with the pipeline API and wondered if you
 thought about it.

 It can happen the Coders are compatible between them. Simple example is
 a text coder like JSON or XML will be able to read text. However with the
 pipeline API you can't support this directly and
 enforce the user to use an intermediate state to be typed.

 Is there already a way to avoid these useless round trips?

 Said otherwise: how to handle coders transitivity?

 Romain Manni-Bucau
 @rmannibucau  |  Blog
  | Old Blog
  | Github
  | LinkedIn
 

>>>
>>>
>>
>


Re: [DISCUSS] State of the project: Feature roadmap for 2018

2018-01-30 Thread Ben Chambers
ead of enabling it easier to write features I think more users
>>> would care about being able to move their pipeline between different
>>> runners and one of the key missing features is dynamic work rebalancing in
>>> all runners (except Dataflow).
>>> Also, portability is meant to help make a crisp line between what are
>>> the responsibilities of the Runner and the SDK which would help make it
>>> easier to write features in an SDK and to support features in Runners.
>>>
>>> 2) To realize portability there are a lot of JIRAs being tracked under
>>> the portability label[1] that need addressing to be able to run an existing
>>> pipeline in a portable manner before we even get to more advanced features.
>>>
>>> 1:
>>> https://issues.apache.org/jira/browse/BEAM-3515?jql=project%20%3D%20BEAM%20AND%20labels%20%3D%20portability
>>>
>>> 3) Ben, do you want to design and run a couple of polls (similar to the
>>> Java 8 poll) to get feedback from our users based upon the list of major
>>> features being developed?
>>>
>>> 4) Yes, plenty. It would be worthwhile to have someone walk through the
>>> open JIRAs and mark them with a label and also summarize what groups they
>>> fall under as there are plenty of good ideas there.
>>>
>>> On Tue, Jan 23, 2018 at 5:25 PM, Robert Bradshaw 
>>> wrote:
>>>
>>>> In terms of features, I think a key thing we should focus on is making
>>>> simple things simple. Beam is very powerful, but it doesn't always
>>>> make easy things easy. Features like schema'd PCollections could go a
>>>> long way here. Also fully fleshing out/smoothing our runner
>>>> portability story is part of this too.
>>>>
>>>> For beam 3.x we could also reason about if there's any complexity that
>>>> doesn't hold its weight (e.g. side inputs on CombineFns).
>>>>
>>>> On Mon, Jan 22, 2018 at 9:20 PM, Jean-Baptiste Onofré 
>>>> wrote:
>>>> > Hi Ben,
>>>> >
>>>> > about the "technical roadmap", we have a thread about "Beam 3.x
>>>> roadmap".
>>>> >
>>>> > It already provides ideas for points 3 & 4.
>>>> >
>>>> > Regards
>>>> > JB
>>>> >
>>>> > On 01/22/2018 09:15 PM, Ben Chambers wrote:
>>>> >> Thanks Davor for starting the state of the project discussions [1].
>>>> >>
>>>> >>
>>>> >> In this fork of the state of the project discussion, I’d like to
>>>> start the
>>>> >> discussion of the feature roadmap for 2018 (and beyond).
>>>> >>
>>>> >>
>>>> >> To kick off the discussion, I think the features could be divided
>>>> into several
>>>> >> areas, as follows:
>>>> >>
>>>> >>  1.
>>>> >>
>>>> >> Enabling Contributions: How do we make it easier to add new
>>>> features to the
>>>> >> supported runners? Can we provide a common intermediate layer
>>>> below the
>>>> >> existing functionality that features are translated to so that
>>>> runners only
>>>> >> need to support the intermediate layer and new features only
>>>> need to target
>>>> >> it? What other ways can we make it easier to contribute to the
>>>> development
>>>> >> of Beam?
>>>> >>
>>>> >>  2.
>>>> >>
>>>> >> Realizing Portability: What gaps are there in the promise of
>>>> portability?
>>>> >> For example in [1] we discussed the fact that users must write
>>>> per-runner
>>>> >> code to push system metrics from runners to their monitoring
>>>> platform. This
>>>> >> limits their ability to actually change runners. Credential
>>>> management for
>>>> >> different environments also falls into this category.
>>>> >>
>>>> >>  3.
>>>> >>
>>>> >> Large Features: What major features (like Beam SQL, Beam Python,
>>>> etc.) would
>>>> >> increase the Beam user base in 2018?
>>>> >>
>>>> >>  4.
>>>> >>
>>>> >> Improvements: What small changes could make Beam more appealing
>>>> to users?
>>>> >> Are there API improvements we could make or common mistakes we
>>>> could detect
>>>> >> and/or prevent?
>>>> >>
>>>> >>
>>>> >> Thanks in advance for participating in the discussion. I believe
>>>> that 2018 could
>>>> >> be a great year for Beam, providing easier, more complete runner
>>>> portability and
>>>> >> features that make Beam easier to use for everyone.
>>>> >>
>>>> >>
>>>> >> Ben
>>>> >>
>>>> >>
>>>> >> [1]
>>>> >>
>>>> https://lists.apache.org/thread.html/f750f288af8dab3f468b869bf5a3f473094f4764db419567f33805d0@%3Cdev.beam.apache.org%3E
>>>> >>
>>>> >> [2]
>>>> >>
>>>> https://lists.apache.org/thread.html/01a80d62f2df6b84bfa41f05e15fda900178f882877c294fed8be91e@%3Cdev.beam.apache.org%3E
>>>> >
>>>> > --
>>>> > Jean-Baptiste Onofré
>>>> > jbono...@apache.org
>>>> > http://blog.nanthrax.net
>>>> > Talend - http://www.talend.com
>>>>
>>>
>>>
>>
>


Re: [DISCUSS] State of the project: Feature roadmap for 2018

2018-01-30 Thread Ben Chambers
On Tue, Jan 30, 2018 at 11:25 AM Kenneth Knowles  wrote:

> I've got some thoughts :-)
>
> Here is how I see the direction(s):
>
>  - Requirements to be relevant: known scale, SQL, retractions (required
> for correct answers)
>  - Core value-add: portability! I don't know that there is any other
> project ambitiously trying to run Python and Go on "every" data processing
> engine.
>  - Experiments: SDF and dynamic work rebalancing. Just like event time
> processing, when it matters to users these will become widespread and then
> Beam's runner can easily make the features portable.
>
> So let's do portability really well on all our most active runners. I have
> a radical proposal for how we should think about it:
>
> A portable Beam runner should be defined to be a _service_ hosting the
> Beam job management APIs.
>
> In that sense, we have zero runners today. Even Dataflow is just a service
> hosting its own API with a client-side library for converting a Beam
> pipeline into a Dataflow pipeline. Re-orienting our thinking this way is
> not actually a huge change in code, but emphasizes:
>
>  - our "runners/core" etc should focus on making these services easy
> (Thomas G is doing great work here right now)
>  - a user selecting a runner should be thought of more as just pointing at
> a different endpoint
>  - our testing infrastructure should become much more service-oriented,
> standing these up even for local testing
>  - ditto Luke's point about making a crisp line of SDK/runner
> responsibility
>

+1, I like this perspective -- I think this would be really useful. If this
encompasses more than just running (eg., getting results/metrics/logs/etc.)
out of the pipelines, then it enables to treat Beam as a true abstraction
layer on top of the data processing service, and build their own
infrastructure around Beam rather than specializing.


>
> On Fri, Jan 26, 2018 at 12:58 PM, Lukasz Cwik  wrote:
>
>> 1) Instead of enabling it easier to write features I think more users
>> would care about being able to move their pipeline between different
>> runners and one of the key missing features is dynamic work rebalancing in
>> all runners (except Dataflow).
>> Also, portability is meant to help make a crisp line between what are the
>> responsibilities of the Runner and the SDK which would help make it easier
>> to write features in an SDK and to support features in Runners.
>>
>> 2) To realize portability there are a lot of JIRAs being tracked under
>> the portability label[1] that need addressing to be able to run an existing
>> pipeline in a portable manner before we even get to more advanced features.
>>
>> 1:
>> https://issues.apache.org/jira/browse/BEAM-3515?jql=project%20%3D%20BEAM%20AND%20labels%20%3D%20portability
>>
>> 3) Ben, do you want to design and run a couple of polls (similar to the
>> Java 8 poll) to get feedback from our users based upon the list of major
>> features being developed?
>>
>> 4) Yes, plenty. It would be worthwhile to have someone walk through the
>> open JIRAs and mark them with a label and also summarize what groups they
>> fall under as there are plenty of good ideas there.
>>
>> On Tue, Jan 23, 2018 at 5:25 PM, Robert Bradshaw 
>> wrote:
>>
>>> In terms of features, I think a key thing we should focus on is making
>>> simple things simple. Beam is very powerful, but it doesn't always
>>> make easy things easy. Features like schema'd PCollections could go a
>>> long way here. Also fully fleshing out/smoothing our runner
>>> portability story is part of this too.
>>>
>>> For beam 3.x we could also reason about if there's any complexity that
>>> doesn't hold its weight (e.g. side inputs on CombineFns).
>>>
>>> On Mon, Jan 22, 2018 at 9:20 PM, Jean-Baptiste Onofré 
>>> wrote:
>>> > Hi Ben,
>>> >
>>> > about the "technical roadmap", we have a thread about "Beam 3.x
>>> roadmap".
>>> >
>>> > It already provides ideas for points 3 & 4.
>>> >
>>> > Regards
>>> > JB
>>> >
>>> > On 01/22/2018 09:15 PM, Ben Chambers wrote:
>>> >> Thanks Davor for starting the state of the project discussions [1].
>>> >>
>>> >>
>>> >> In this fork of the state of the project discussion, I’d like to
>>> start the
>>> >> discussion of the feature roadmap for 2018 (and beyond).
>>> >>
>>> >>
>>> >> To kic

Re: [PROPOSAL] Add a blog post for every new release

2018-01-29 Thread Ben Chambers
+1. Release notes from jira may be hard to digest for people outside the
project. A short summary for may be really helpful for those considering
Beam or loosely following the project to get an understanding of what is
happening.

It also presents a good opportunity share progress externally, which may
attract more users to try things out.

On Mon, Jan 29, 2018, 9:11 AM Kenneth Knowles  wrote:

> +1 I like the idea and agree with adding it to the release guide. Even a
> very short post the length of an email is nice. We could have a template to
> make it very easy.
>
> On Mon, Jan 29, 2018 at 8:02 AM, Romain Manni-Bucau  > wrote:
>
>> +1 to have it as a best effort - most of projects do. But as JB said, if
>> it slows down the release motivation it shouldn't be enforced but just
>> encouraged. A good solution Ismael is you take this responsability for the
>> coming releases after the release manager is done with the annoucement.
>> This way we have the best of both worlds :).
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau  |  Blog
>>  | Old Blog
>>  | Github
>>  | LinkedIn
>> 
>>
>> 2018-01-29 15:02 GMT+01:00 Jean-Baptiste Onofré :
>>
>>> Hi Ismaël
>>>
>>> The idea is good, but the post should be pretty short. Let me explain:
>>>
>>> - We will have a release every two months now, so, some releases might be
>>> lighter than others, and it's normal
>>> - the Jira Release Notes already provides lot of details
>>>
>>> For instance, in Apache projects like Karaf, Camel, and others, we do the
>>> announcement of a release on the mailing lists with the release notes
>>> linked.
>>> Sometime, we do a blog to highlight some interesting new features, but
>>> it's not
>>> systematic.
>>>
>>> So, I agree: it's a good idea and I would give some highlights about
>>> what we are
>>> doing and where we are heading. However, I don't think we have to
>>> "enforce" such
>>> blog post for every single release. It's a best effort.
>>>
>>> My $0.01 ;)
>>>
>>> Regards
>>> JB
>>>
>>> On 01/29/2018 02:47 PM, Ismaël Mejía wrote:
>>> > This is a fork of a recent message I sent as part of the preparations
>>> > for the next release.
>>> >
>>> > [tl;dr] I would like to propose that we create a new blog post for
>>> > every new release and that this becomes part of the release guide.
>>> >
>>> > I think that even if we do shorter releases we need to make this part
>>> > of the release process. We haven’t been really consistent about
>>> > communication on new releases in the past. Sometimes we did a blog
>>> > post and sometimes we didn’t.
>>> >
>>> > In particular I was a bit upset that we didn't do a blog post for the
>>> > last two releases, and the list of JIRA issues sadly does not cover
>>> > the importance of some of the features of those releases. I am still a
>>> > bit upset that we didn't publicly mentioned features like the SQL
>>> > extension, the recent IOs, the new FileIO related improvements and
>>> > Nexmark. Also I think the blog format is better for ‘marketing
>>> > reasons’ because not everybody reads the mailing list.
>>> >
>>> > Of course the only issue about this is to decide what to put in the
>>> > release notes and who will do it. We can do this by sharing a google
>>> > doc that everyone can edit to add their highlights and then reformat
>>> > it for blog publication, a bit similar to the format used by Gris for
>>> > the newsletter. Actually if we have paced releases probably we can mix
>>> > both the release notes and the newsletter into one, no ?
>>> >
>>> > What do you think? Other ideas/disagreement/etc.
>>> >
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>>
>


[DISCUSS] State of the project: Feature roadmap for 2018

2018-01-22 Thread Ben Chambers
Thanks Davor for starting the state of the project discussions [1].

In this fork of the state of the project discussion, I’d like to start the
discussion of the feature roadmap for 2018 (and beyond).

To kick off the discussion, I think the features could be divided into
several areas, as follows:

   1.

   Enabling Contributions: How do we make it easier to add new features to
   the supported runners? Can we provide a common intermediate layer below the
   existing functionality that features are translated to so that runners only
   need to support the intermediate layer and new features only need to target
   it? What other ways can we make it easier to contribute to the development
   of Beam?
   2.

   Realizing Portability: What gaps are there in the promise of
   portability? For example in [1] we discussed the fact that users must write
   per-runner code to push system metrics from runners to their monitoring
   platform. This limits their ability to actually change runners. Credential
   management for different environments also falls into this category.
   3.

   Large Features: What major features (like Beam SQL, Beam Python, etc.)
   would increase the Beam user base in 2018?
   4.

   Improvements: What small changes could make Beam more appealing to
   users? Are there API improvements we could make or common mistakes we could
   detect and/or prevent?


Thanks in advance for participating in the discussion. I believe that 2018
could be a great year for Beam, providing easier, more complete runner
portability and features that make Beam easier to use for everyone.

Ben

[1]
https://lists.apache.org/thread.html/f750f288af8dab3f468b869bf5a3f473094f4764db419567f33805d0@%3Cdev.beam.apache.org%3E
[2]
https://lists.apache.org/thread.html/01a80d62f2df6b84bfa41f05e15fda900178f882877c294fed8be91e@%3Cdev.beam.apache.org%3E


Re: Callbacks/other functions run after a PDone/output transform

2017-12-15 Thread Ben Chambers
Would it make more sense for the side input watermark and details about the
pane to be made available to the dofn which can then decide how to handle
it? Then if a dofn only wants the final pane, it is analogous to
triggering-is-for-sinks to push that back and only produce the views the
dofn wants.

I think exposing it and letting  the dofn figure it out has similarities to
how the input punctuation for each input might be exposed in other systems.

On Fri, Dec 15, 2017, 5:31 PM Eugene Kirpichov  wrote:

> So this appears not as easy as anticipated (surprise!)
>
> Suppose we have a PCollection "donePanes" with an element per
> fully-processed pane: e.g. BigQuery sink, and elements saying "a pane of
> data has been written; this pane is: final / non-final".
>
> Suppose we want to use this to ensure that somePc.apply(ParDo.of(fn))
> happens only after the final pane has been written.
>
> In other words: we want a.apply(ParDo.of(b).withSideInput(c)) to happen
> when c emits a *final* pane.
>
> Unfortunately, using
> ParDo.of(fn).withSideInputs(donePanes.apply(View.asSingleton())) doesn't do
> the trick: the side input becomes ready the moment *the first *pane of
> data has been written.
>
> But neither does ParDo.of(fn).withSideInputs(donePanes.apply(...filter
> only final panes...).apply(View.asSingleton())). It also becomes ready the
> moment *the first* pane has been written, you just get an exception if
> you access the side input before the *final* pane was written.
>
> I can't think of a pure-Beam solution to this: either "donePanes" will be
> used as a main input to something (and then everything else can only be a
> side input, which is not general enough), or it will be used as a side
> input (and then we can't achieve "trigger only after the final pane fires").
>
> It seems that we need a way to control the side input pushback, and
> configure whether a view becomes ready when its first pane has fired or
> when its last pane has fired. I could see this be a property on the View
> transform itself. In terms of implementation - I tried to figure out how
> side input readiness is determined, in the direct runner and Dataflow
> runner, and I'm completely lost and would appreciate some help.
>
> On Thu, Dec 7, 2017 at 12:01 AM Reuven Lax  wrote:
>
>> This sounds great!
>>
>> On Mon, Dec 4, 2017 at 4:34 PM, Ben Chambers 
>> wrote:
>>
>>> This would be absolutely great! It seems somewhat similar to the changes
>>> that were made to the BigQuery sink to support WriteResult (
>>> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
>>> ).
>>>
>>> I find it helpful to think about the different things that may come
>>> after a sink. For instance:
>>>
>>> 1. It might be helpful to have a collection of failing input elements.
>>> The type of failed elements is pretty straightforward -- just the input
>>> elements. This allows handling such failures by directing them elsewhere or
>>> performing additional processing.
>>>
>>
>> BigQueryIO already does this as you point out.
>>
>
>>> 2. For a sink that produces a series of files, it might be useful to
>>> have a collection of the file names that have been completely written. This
>>> allows performing additional handling on these completed segments.
>>>
>>
>> In fact we already do this for FileBasedSinks.   See
>> https://github.com/apache/beam/blob/7d53878768757ef2115170a5073b99956e924ff2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFilesResult.java
>>
>
>>> 3. For a sink that updates some destination, it would be reasonable to
>>> have a collection that provides (periodically) output indicating how
>>> complete the information written to that destination is. For instance, this
>>> might be something like " has all of the elements up
>>> to " complete. This allows tracking how much information
>>> has been completely written out.
>>>
>>
>> Interesting. Maybe tough to do since sinks often don't have that
>> knowledge.
>>
>
>>
>>>
>>> I think those concepts map to the more detailed description Eugene
>>> provided, but I find it helpful to focus on what information comes out of
>>> the sink and how it might be used.
>>>
>>> Were there any use cases the above miss? Any functionality that has been
>>> described that doesn't map to these use cases?
>>>
>>> -- Ben
>>>
>&g

Re: Callbacks/other functions run after a PDone/output transform

2017-12-04 Thread Ben Chambers
This would be absolutely great! It seems somewhat similar to the changes
that were made to the BigQuery sink to support WriteResult (
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
).

I find it helpful to think about the different things that may come after a
sink. For instance:

1. It might be helpful to have a collection of failing input elements. The
type of failed elements is pretty straightforward -- just the input
elements. This allows handling such failures by directing them elsewhere or
performing additional processing.

2. For a sink that produces a series of files, it might be useful to have a
collection of the file names that have been completely written. This allows
performing additional handling on these completed segments.

3. For a sink that updates some destination, it would be reasonable to have
a collection that provides (periodically) output indicating how complete
the information written to that destination is. For instance, this might be
something like " has all of the elements up to " complete. This allows tracking how much information has been
completely written out.

I think those concepts map to the more detailed description Eugene
provided, but I find it helpful to focus on what information comes out of
the sink and how it might be used.

Were there any use cases the above miss? Any functionality that has been
described that doesn't map to these use cases?

-- Ben

On Mon, Dec 4, 2017 at 4:02 PM Eugene Kirpichov 
wrote:

> It makes sense to consider how this maps onto existing kinds of sinks.
>
> E.g.:
> - Something that just makes an RPC per record, e.g. MqttIO.write(): that
> will emit 1 result per bundle (either a bogus value or number of records
> written) that will be Combine'd into 1 result per pane of input. A user can
> sequence against this and be notified when some intermediate amount of data
> has been written for a window, or (via .isFinal()) when all of it has been
> written.
> - Something that e.g. initiates an import job, such as BigQueryIO.write(),
> or an ElasticsearchIO write with a follow-up atomic index swap: should emit
> 1 result per import job, e.g. containing information about the job (e.g.
> its id and statistics). Role of panes is the same.
> - Something like above but that supports dynamic destinations: like in
> WriteFiles, result will be PCollection> where
> ResultT may be something like a list of files that were written for this
> pane of this destination.
>
> On Mon, Dec 4, 2017 at 3:58 PM Eugene Kirpichov 
> wrote:
>
>> I agree that the proper API for enabling the use case "do something after
>> the data has been written" is to return a PCollection of objects where each
>> object represents the result of writing some identifiable subset of the
>> data. Then one can apply a ParDo to this PCollection, in order to "do
>> something after this subset has been written".
>>
>> The challenging part here is *identifying* the subset of the data that's
>> been written, in a way consistent with Beam's unified batch/streaming
>> model, where saying "all data has been written" is not an option because
>> more data can arrive.
>>
>> The next choice is "a window of input has been written", but then again,
>> late data can arrive into a window as well.
>>
>> Next choice after that is "a pane of input has been written", but per
>> https://s.apache.org/beam-sink-triggers the term "pane of input" is
>> moot: triggering and panes should be something private to the sink, and the
>> same input can trigger different sinks differently. The hypothetical
>> different accumulation modes make this trickier still. I'm not sure whether
>> we intend to also challenge the idea that windowing is inherent to the
>> collection, or whether it too should be specified on a transform that
>> processes the collection. I think for the sake of this discussion we can
>> assume that it's inherent, and assume the mental model that the elements in
>> different windows of a PCollection are processed independently - "as if"
>> there were multiple pipelines processing each window.
>>
>> Overall, embracing the full picture, we end up with something like this:
>> - The input PCollection is a composition of windows.
>> - If the windowing strategy is non-merging (e.g. fixed or sliding
>> windows), the below applies to the entire contents of the PCollection. If
>> it's merging (e.g. session windows), then it applies per-key, and the input
>> should be (perhaps implicitly) keyed in a way that the sink understands -
>> for example, the grouping by destination in DynamicDestinations in file and
>> bigquery writes.
>> - Each window's contents is a "changelog" - stream of elements and
>> retractions.
>> - A "sink" processes each window of the collection, deciding how to
>> handle elements and retractions (and whether to support retractions at all)
>> in a sink-specific way, and deciding *when* to perform the si

Re: makes bundle concept usable?

2017-11-30 Thread Ben Chambers
I think both concepts likely need to co-exist:

As described in the execution model [1] bundling is a runner-specific
choice about how to execute a pipeline. This affects how frequently it may
need to checkpoint during process, how much communication overhead there is
between workers, the scope of retries, etc. To allow runners to innovate
and explore various options in this space, the semantics of a pipeline
shouldn't depend on bundling.

For cases where a pipeline wants to process N values together, a transform
(or annotation) to explicitly indicate that the semantics of the pipeline
require processing batches of elements makes sense. This is the intended
use of GroupIntoBatches, and allows the two concepts to be kept distinct --
bundles are a runner choice and GroupIntoBatches is a semantic choice.

https://beam.apache.org/documentation/execution-model/#bundling-and-persistence

On Thu, Nov 30, 2017 at 9:46 AM Romain Manni-Bucau 
wrote:

> @Ben: would all IO be rewritten to use that and the bundle concept
> dropped from the API to avoid any ambiguity and misleading usage like
> in current IOs?
>
> Romain Manni-Bucau
> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>
>
> 2017-11-30 18:43 GMT+01:00 Ben Chambers :
> > Beam includes a GroupIntoBatches transform (see
> >
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java
> )
> > which I believe was intended to be used as part of such a portable IO. It
> > can be used to request that elements are divided into batches of some
> size
> > which can then be used for further processing.
> >
> > On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >>
> >> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov :
> >> > Very strong -1 from me:
> >> > - Having a pipeline-global parameter is bad because it will apply to
> all
> >> > transforms, with no ability to control it for individual transforms.
> >> > This
> >> > can go especially poorly because it means that when I write a
> transform,
> >> > I
> >> > don't know whether a user will set this parameter in their pipeline
> to a
> >> > value that's perhaps good for the user's transform but really bad for
> my
> >> > transform; and the user will likely blame my transform for poor
> >> > performance.
> >> > A parameter like this should be set on exactly the thing it applies
> to:
> >> > e.g.
> >> > on the particular IO; and it should be set by the IO itself, not by a
> >> > user
> >> > in pipeline options, because the IO author likely knows better than a
> >> > user
> >> > what is a good value.
> >>
> >> This is true and this is worse today since the user can't tune it but
> >> the IO doesn't handle it as well. it is up to the runner and none
> >> implement it in a way which is IO friendly -check flink and spark
> >> which do the exact opposite, bundle=1 vs bundle=datatset/partitions)
> >>
> >> Also note it is a "max" and not an exact value in the proposal.
> >>
> >> > - The parameter will not achieve what many IOs want, either. In some
> >> > cases,
> >> > you want to limit the number of bytes you write. In some cases, you
> want
> >> > to
> >> > limit the number of values within a key that you write. In some cases,
> >> > it's
> >> > something else - it isn't always elements.
> >>
> >> Elements is the only thing users can really tune since you can't
> >> assume the content.
> >>
> >> > - The parameter will achieve none of the issues that you I think
> raised
> >> > in
> >> > the thread above: it doesn't give deterministic replay, nor any kind
> of
> >> > fault tolerance.
> >>
> >> Right, it only partially solves the first issue popping up: the
> >> chunking. However I think it is a quick win.
> >>
> >> > - Having a parameter like this *at all* goes against Beam's "no knobs"
> >> > philosophy - for all the usual reasons: 1) it encourages users to
> waste
> >> > time
> >> > looking in the wrong places when doing performance tuning: tuning
> >> > parameters
> >> > is almost never the best way to improve performance; 2) when users can
> >> > set a
> >> > tuning parameter, in my experience it is almost always set wrong, or
> >&g

Re: makes bundle concept usable?

2017-11-30 Thread Ben Chambers
Beam includes a GroupIntoBatches transform (see
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java)
which I believe was intended to be used as part of such a portable IO. It
can be used to request that elements are divided into batches of some size
which can then be used for further processing.

On Thu, Nov 30, 2017 at 9:32 AM Romain Manni-Bucau 
wrote:

> 2017-11-30 18:11 GMT+01:00 Eugene Kirpichov :
> > Very strong -1 from me:
> > - Having a pipeline-global parameter is bad because it will apply to all
> > transforms, with no ability to control it for individual transforms. This
> > can go especially poorly because it means that when I write a transform,
> I
> > don't know whether a user will set this parameter in their pipeline to a
> > value that's perhaps good for the user's transform but really bad for my
> > transform; and the user will likely blame my transform for poor
> performance.
> > A parameter like this should be set on exactly the thing it applies to:
> e.g.
> > on the particular IO; and it should be set by the IO itself, not by a
> user
> > in pipeline options, because the IO author likely knows better than a
> user
> > what is a good value.
>
> This is true and this is worse today since the user can't tune it but
> the IO doesn't handle it as well. it is up to the runner and none
> implement it in a way which is IO friendly -check flink and spark
> which do the exact opposite, bundle=1 vs bundle=datatset/partitions)
>
> Also note it is a "max" and not an exact value in the proposal.
>
> > - The parameter will not achieve what many IOs want, either. In some
> cases,
> > you want to limit the number of bytes you write. In some cases, you want
> to
> > limit the number of values within a key that you write. In some cases,
> it's
> > something else - it isn't always elements.
>
> Elements is the only thing users can really tune since you can't
> assume the content.
>
> > - The parameter will achieve none of the issues that you I think raised
> in
> > the thread above: it doesn't give deterministic replay, nor any kind of
> > fault tolerance.
>
> Right, it only partially solves the first issue popping up: the
> chunking. However I think it is a quick win.
>
> > - Having a parameter like this *at all* goes against Beam's "no knobs"
> > philosophy - for all the usual reasons: 1) it encourages users to waste
> time
> > looking in the wrong places when doing performance tuning: tuning
> parameters
> > is almost never the best way to improve performance; 2) when users can
> set a
> > tuning parameter, in my experience it is almost always set wrong, or
> perhaps
> > it was once set right but then nobody updates it when the use case or
> > implementation changes; and we can end up in a situation where the
> pipeline
> > is performing poorly because of the parameter but the runner isn't
> allowed
> > to choose a better value. (in experience with legacy data processing
> systems
> > in Google, like MapReduce, that support plenty of tuning parameters, a
> very
> > common advice to someone complaining about a poorly performing job is
> "have
> > you tried removing all your parameters?")
>
> I would be fine with that but what is the alternative?
>
> > - I still fail to understand the exact issue we're talking about, and
> I've
> > made a number of suggestions as to how this understanding could be
> achieved:
> > show code that demonstrates the issue; and show how the code could be
> > improved by a hypothetical API.
>
> First immediately blocking issue is how to batch records reliably and
> *portably* (the biggest beam added-value IMHO).
> Since bundles are "flush" trigger for most IO it means ensuring the
> bundle size is somehow controllable or at least not set to a very
> small value OOTB.
>
> An alternative to this proposal can be to let an IO give an hint about
> its desired bundle size. Would work as well for that particular issue.
> Does it sound better?
>
> >
> > On Thu, Nov 30, 2017 at 6:17 AM Jean-Baptiste Onofré 
> > wrote:
> >>
> >> It sounds reasonable to me.
> >>
> >> And agree for Spark, I would like to merge Spark 2 update first.
> >>
> >> Regards
> >> JB
> >>
> >> On 11/30/2017 03:09 PM, Romain Manni-Bucau wrote:
> >> > Guys,
> >> >
> >> > what about moving getMaxBundleSize from flink options to pipeline
> >> > options. I think all runners can support it right? Spark code needs
> >> > the merge of the v2 before being able to be implemented probably but I
> >> > don't see any blocker.
> >> >
> >> > wdyt?
> >> >
> >> > Romain Manni-Bucau
> >> > @rmannibucau |  Blog | Old Blog | Github | LinkedIn
> >> >
> >> >
> >> > 2017-11-19 8:19 GMT+01:00 Romain Manni-Bucau :
> >> >> @Eugene: "workaround" as specific to the IO each time and therefore
> >> >> still highlight a lack in the core.
> >> >>
> >> >> Other comments inline
> >> >>
> >> >>
> >> >> 2017-11-19 7:40 GMT+01:00 Robert Bradshaw
> >> >> :
> >> >>> There is a possib

Re: [DISCUSS] Updating contribution guide for gitbox

2017-11-28 Thread Ben Chambers
One risk to "squash and merge" is that it may lead to commits that don't
have clean descriptions -- for instance, commits like "Fixing review
comments" will show up. If we use (a) these would also show up as separate
commits. It seems like there are two cases of multiple commits in a PR:

1. Multiple commits in a PR that have semantic meaning (eg., a PR performed
N steps, split across N commits). In this case, keeping the descriptions
and performing either a merge (if the commits are separately valid) or
squash (if we want the commits to become a single commit in master)
probably makes sense.
2. Multiple commits in a PR that just reflect the review history. In this
case, we should probably ask the PR author to explicitly rebase their PR to
have semantically meaningful commits prior to merging. (Eg., do a rebase
-i).

On Tue, Nov 28, 2017 at 9:46 AM Kenneth Knowles  wrote:

> Hi all,
>
> James brought up a great question in Slack, which was how should we use
> the merge button, illustrated [1]
>
> I want to broaden the discussion to talk about all the new capabilities:
>
> 1. Whether & how to use the "reviewer" field
> 2. Whether & how to use the "assignee" field
> 3. Whether & how to use the merge button
>
> My preferences are:
>
> 1. Use the reviewer field instead of "R:" comments.
> 2. Use the assignee field to keep track of who the review is blocked on
> (either the reviewer for more comments or the author for fixes)
> 3. Use merge commits, but editing the commit subject line
>
> To expand on part 3, GitHub's merge button has three options [1]. They are
> not described accurately in the UI, as they all say "merge" when only one
> of them performs a merge. They do the following:
>
> (a) Merge the branch with a merge commit
> (b) Squash all the commits, rebase and push
> (c) Rebase and push without squash
>
> Unlike our current guide, all of these result in a "merged" status for the
> PR, so we can correctly distinguish those PRs that were actually merged.
>
> My votes on these options are:
>
> (a) +1 this preserves the most information
> (b) -1 this erases the most information
> (c) -0 this is just sort of a middle ground; it breaks commit hashes, does
> not have a clear merge commit, but preserves other info
>
> Kenn
>
> [1] https://apachebeam.slack.com/messages/C1AAFJYMP/
>
>
>
>
>
> Kenn
>


Re: [DISCUSS] Thinking about Beam 3.x roadmap and release schedule

2017-11-28 Thread Ben Chambers
Strong +1 to both increasing the frequency of minor releases and also
putting together a road map for the next major release or two.

I think it would be great to communicate to the community the direction
Beam is taking in the future -- what things will users be able to do with
3.0 or 4.0 that they can't do with 2.x?

On Tue, Nov 28, 2017 at 9:25 AM Kenneth Knowles  wrote:

> Yea, let's work hard on improving the ease and pace of releases. I am not
> really happy to have only quarterly releases.
>
> Automation of release process where possible, better test coverage, a
> higher resistance to cherry-picks.
>
> Kenn
>
> On Tue, Nov 28, 2017 at 9:14 AM, Jean-Baptiste Onofré 
> wrote:
>
>> Hi Reuven,
>>
>> Yes, I remember that we agreed on a release per month. However, we didn't
>> do it before. I think the most important is not the period, it's more a
>> stable pace. I think it's more interesting for our community to have
>> "always" a release every two months, more than a tentative of a release
>> every month that end later than that. Of course, if we can do both, it's
>> perfect ;)
>>
>> For Beam 3.x, I wasn't talking about breaking change, but more about
>> "marketing" announcement. I think that, even if we don't break API, some
>> features are "strong enough" to be "qualified" in a major version.
>>
>> I think that any major idea & feature (breaking or not the API) are
>> valuables for Beam 3.x (and it's a good sign for our community again ;)).
>>
>> Thanks !
>> Regards
>> JB
>>
>> On 11/28/2017 06:09 PM, Reuven Lax wrote:
>>
>>>
>>>
>>> On Tue, Nov 28, 2017 at 8:55 AM, Jean-Baptiste Onofré >> > wrote:
>>>
>>> Hi guys,
>>>
>>> Even if there's no rush, I think it would be great for the community
>>> to have
>>> a better view on our roadmap and where we are going in term of
>>> schedule.
>>>
>>> I would like to discuss the following:
>>> - a best effort to maintain a good release pace or at least provide
>>> a rough
>>> schedule. For instance, in Apache Karaf, I have a release schedule
>>> (http://karaf.apache.org/download.html#container-schedule
>>> ). I
>>> think a
>>> release ~ every quarter would be great.
>>>
>>>
>>> Originally we had stated that we wanted monthly releases of Beam. So far
>>> the releases have been painful enough that monthly hasn't happened. I think
>>> we should address these issues and go to monthly releases as originally
>>> stated.
>>>
>>> - if I see new Beam 2.x releases for sure (according to the previous
>>> point),
>>> it would be great to have discussion about Beam 3.x. I think that
>>> one of
>>> interesting new feature that Beam 3.x can provide is around
>>> PCollection with
>>> Schemas. It's something that we started to discuss with Reuven and
>>> Eugene.
>>> In term of schedule,
>>>
>>>
>>> I don't think schemas require Beam 3.0 - I think we can introduce them
>>> without making breaking changes. However there are many other features that
>>> would be very interesting for Beam 3.x, and we should start putting
>>> together a list of them.
>>>
>>>
>>> I would love to see your thoughts & ideas about releases schedule
>>> and Beam 3.x.
>>>
>>> Regards
>>> JB
>>> -- Jean-Baptiste Onofré
>>> jbono...@apache.org 
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>


Re: [DISCUSSION] Runner agnostic metrics extractor?

2017-11-27 Thread Ben Chambers
I think discussing a runner agnostic way of configuring how metrics are
extracted is a great idea -- thanks for bringing it up Etienne!

Using a thread that polls the pipeline result relies on the program that
created and submitted the pipeline continuing to run (eg., no machine
faults, network problems, etc.). For many applications, this isn't a good
model (a Streaming pipeline may run for weeks, a Batch pipeline may be
automatically run every hour, etc.).

Etienne's proposal of having something the runner pushes metrics too has
the benefit of running in the same cluster as the pipeline, thus having the
same reliability benefits.

As noted, it would require runners to ensure that metrics were pushed into
the extractor but from there it would allow a general configuration of how
metrics are extracted from the pipeline and exposed to some external
services.

Providing something that the runners could push metrics into and have them
automatically exported seems like it would have several benefits:
  1. It would provide a single way to configure how metrics are actually
exported.
  2. It would allow the runners to ensure it was reliably executed.
  3. It would allow the runner to report system metrics directly (eg., if a
runner wanted to report the watermark, it could push that in directly).

-- Ben

On Mon, Nov 27, 2017 at 9:06 AM Jean-Baptiste Onofré 
wrote:

> Hi all,
>
> Etienne forgot to mention that we started a PoC about that.
>
> What I started is to wrap the Pipeline creation to include a thread that
> polls
> periodically the metrics in the pipeline result (it's what I proposed when
> I
> compared with Karaf Decanter some time ago).
> Then, this thread marshalls the collected metrics and send to a sink. At
> the
> end, it means that the harvested metrics data will be store in a backend
> (for
> instance elasticsearch).
>
> The pro of this approach is that it doesn't require any change in the
> core, it's
> up to the user to use the PipelineWithMetric wrapper.
>
> The cons is that the user needs to explicitly use the PipelineWithMetric
> wrapper.
>
> IMHO, it's good enough as user can decide to poll metrics for some
> pipelines and
> not for others.
>
> Regards
> JB
>
> On 11/27/2017 04:56 PM, Etienne Chauchot wrote:
> > Hi all,
> >
> > I came by this ticket https://issues.apache.org/jira/browse/BEAM-2456.
> I know
> > that the metrics subject has already been discussed a lot, but I would
> like to
> > revive the discussion.
> >
> > The aim in this ticket is to avoid relying on the runner to provide the
> metrics
> > because they don't have all the same capabilities towards metrics. The
> idea in
> > the ticket is to still use beam metrics API (and not others like
> codahale as it
> > has been discussed some time ago) and provide a way to extract the
> metrics with
> > a polling thread that would be forked by a PipelineWithMetrics (so,
> almost
> > invisible to the end user) and then to push to a sink (such as a Http
> rest sink
> > for example or Graphite sink or anything else...). Nevertheless, a
> polling
> > thread might not work for all the runners because some might not make the
> > metrics available before the end of the pipeline. Also, forking a thread
> would
> > be a bit unconventional, so it could be provided as a beam sdk extension.
> >
> > Another way, to avoid polling, would be to push metrics values to a sink
> when
> > they are updated but I don't know if it is feasible in a runner
> independent way.
> >
> > WDYT about the ideas in this ticket?
> >
> > Best,
> > Etienne
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>


Re: [VOTE] Release 2.2.0, release candidate #2

2017-11-08 Thread Ben Chambers
This seems to be the second thread entitled "[VOTE] Release 2.2.0, release
candidate #2". The subject and description refer to release candidate #2,
however the artifacts mention v2.2.0-RC3. Which release candidate is this
vote thread for?

On Wed, Nov 8, 2017 at 12:52 PM Jean-Baptiste Onofré 
wrote:

> Agree.
>
> I just would like what changed exactly as I didn't have any issue when I
> did the 2.1.0 release.
>
> Regards
> JB
>
> On Nov 8, 2017, 21:50, at 21:50, Kenneth Knowles 
> wrote:
> >Agree with everything Robert said. So if we just rebuild the Python zip
> >then this should g2g?
> >
> >On Wed, Nov 8, 2017 at 12:37 PM, Robert Bradshaw <
> >rober...@google.com.invalid> wrote:
> >
> >> Let me try to clarify the state of the world (with regards to Python
> >> and proto files).
> >>
> >> * When Python setup.py is run, it checks to see if the generated pb2
> >> files exist. If not, it attempts to generate them by installing the
> >> proto compiler and looking up the .proto definitions in its parent
> >> directory. This works great for the developer that checked out the
> >> full pristine sources from git (or otherwise obtained them).
> >>
> >> * For the sdist tarball uploaded to PyPi (aka Python Artifact), we
> >> ship the generated pb2 files both because (1) we don't want to force
> >> the user to install the proto compiler and (2) the "parent" directory
> >> doesn't exist as we're just shipping the sdks/python/... portion of
> >> the full git repository.
> >>
> >> * All previous "releases" in
> >> https://dist.apache.org/repos/dist/release/beam/ post the Python
> >> artifact (which is Python sources + generated proto files, but
> >notably
> >> no source proto files) in addition to the full source artifact (which
> >> contains some snapshot of the full git repository, Python and proto
> >> files included). We also separately publish Java artifacts offsite
> >> which is what people will install from.
> >>
> >> So it seems the purpose of the -python.zip file is just to stage what
> >> we intend to release on PyPi (e.g. for testing); it is not a source
> >> distribution (that is taken care of by the adjacent -source.zip file)
> >> and so there's no issue with it containing generated files. It should
> >> be the output of "python setup.py sdist" (possibly invoked by the mvn
> >> release commands, if you can get those to work). On the other hand,
> >> creating a separate python-only source distribution would serve no
> >> purpose, as it would be redundant with the existing
> >> everything-source-distribution which is just a manually taken
> >snapshot
> >> of the entire git repository. The confusion is around the role of the
> >> -python.zip file, and if we clarify that it's the proposed Python
> >PyPi
> >> artifact, and *not* some kind of python-only source distribution, the
> >> release process is WAI.
> >>
> >> - Robert
> >>
> >>
> >> On Wed, Nov 8, 2017 at 11:55 AM, Jean-Baptiste Onofré
> >
> >> wrote:
> >> > Let me take a look. Afair I didn't touch those files in last
> >release.
> >> >
> >> > I keep you posted.
> >> >
> >> > Regards
> >> > JB
> >> >
> >> > On Nov 8, 2017, 20:50, at 20:50, Reuven Lax
> >
> >> wrote:
> >> >>I explicitly removed the pb2 files as I thought we determined they
> >> >>shouldn't be in the source release, and they caused RAT failures.
> >What
> >> >>should I be doing here?
> >> >>
> >> >>On Wed, Nov 8, 2017 at 10:21 AM, Robert Bradshaw <
> >> >>rober...@google.com.invalid> wrote:
> >> >>
> >> >>> This is due to having removed the auto-generated pb2 files.
> >> >>>
> >> >>> On Wed, Nov 8, 2017 at 9:37 AM, Valentyn Tymofieiev
> >> >>>  wrote:
> >> >>> > Confirming Ismaël's finding - I also see this error and it did
> >not
> >> >>see it
> >> >>> > on a candidate that was in the staging area yesterday.
> >> >>> >
> >> >>> > On Wed, Nov 8, 2017 at 9:07 AM, Ismaël Mejía
> >
> >> >>wrote:
> >> >>> >
> >> >>> >> I tested the python version of the release I just created a
> >new
> >> >>> >> virtualenv and run
> >> >>> >>
> >> >>> >> python setup.py install and it gave me this message:
> >> >>> >>
> >> >>> >> Traceback (most recent call last):
> >> >>> >>   File "setup.py", line 203, in 
> >> >>> >> 'test': generate_protos_first(test),
> >> >>> >>   File "/usr/lib/python2.7/distutils/core.py", line 151, in
> >setup
> >> >>> >> dist.run_commands()
> >> >>> >>   File "/usr/lib/python2.7/distutils/dist.py", line 953, in
> >> >>> run_commands
> >> >>> >> self.run_command(cmd)
> >> >>> >>   File "/usr/lib/python2.7/distutils/dist.py", line 972, in
> >> >>run_command
> >> >>> >> cmd_obj.run()
> >> >>> >>   File
> >> >>"/home/ismael/.virtualenvs/beam-vote2/local/lib/python2.7/si
> >> >>> >> te-packages/setuptools/command/install.py",
> >> >>> >> line 67, in run
> >> >>> >> self.do_egg_install()
> >> >>> >>   File
> >> >>"/home/ismael/.virtualenvs/beam-vote2/local/lib/python2.7/si
> >> >>> >> te-packages/setuptools/command/install.py",
> >> >>> >> line 109, in do_egg_install
>

Re: [DISCUSS] Move away from Apache Maven as build tool

2017-10-30 Thread Ben Chambers
I think both Gradle and Bazel are worth exploring. Gradle is definitely
more common in the wild, but Bazel may be a better fit for the large
mixture of languages being developed in one codebase within Beam. It might
be helpful for us to list what functionality we want from such a tool, and
then have someone who is interested in this dig in a bit and let us know
more details how well Gradle or Bazel would fit our needs.

On Mon, Oct 30, 2017 at 10:27 AM Kenneth Knowles 
wrote:

> I also support exploring a move away from Apache Maven for orchestrating
> our build.
>
> For a single-module project, I still think it can be a good build tool, and
> we could still use it for this sort of thing, but I think we are reaching a
> multi-module scale where it does not work well. Almost all of our jobs
> build things that are not needed and run tests that are redundant, and it
> is not easy to do better, even with a sequence of maven commands.
>
> I'd like to lay out what we hope for from a tool. Here's a start:
>
> General:
>
>  - Dependency-driven build so devs working on one thing build & test only
> what is needed
>  - Supports orchestration across Protobuf, Java, Python, Go, Docker images
>  - Meets devs where they are, letting folks in one language use familiar
> tools
>  - Caching across builds as much as possible
>  - Easily extensible for when it doesn't have the feature we need (writing
> a maven plugin is too much, using maven-exec-plugin is too crufty)
>  - Preferably a declarative configuration language
>
> Java needs beyond the basics, which could be executed by the orchestrator
> or my module-local mvn builds, etc.
>
>  - Pulling deps from maven central and alternate repos
>  - Findbugs
>  - RAT
>  - Dependency rule enforcement
>  - IWYU (Include What You Use)
>  - Multiple Java versions in same project
>  - ASF release workflow
>
> I probably missed some must-haves or nice-to-haves. I'd love to compile
> thoughts on other languages' needs.
>
> Based on these, another project I would consider is Bazel. We could very
> easily use it to orchestrate, but use Maven (or Gradle!) on the leaves. I
> also think that Gradle is also more focused on the JVM ecosystem, so it is
> not quite as neutral as Bazel, and uses Groovy which is a bit more esoteric
> than Python for writing Bazel rules.
>
> Kenn
>
> On Mon, Oct 30, 2017 at 9:37 AM, Lukasz Cwik 
> wrote:
>
> > I wanted to make this thread more visible. This discussion stems from
> Ken's
> > thread about Jenkins pre/post commit issues[1].
> >
> > I did some investigation as for ways to improve the quality of the signal
> > from Jenkins by trying to modify the Jenkins jobs spawned from Groovy. I
> > had limited success but everything I felt like I was doing was just
> > patching symptoms of the problem which is that our build is just too
> slow.
> > For example, we keep adding all these profiles to Maven or tuning how a
> > plugin runs to eek out a small decrease in build time. I believe swapping
> > away from Apache Maven to a build tool which only builds the things which
> > have changed in a PR would be the best approach.
> >
> > I would suggest that we migrate to Gradle as our build tool. I am
> > suggesting Gradle because:
> > * It is used in lots of open source projects and has a very large
> community
> > behind it.
> > * It has better support for building languages other then Java
> > (PyGradle[2], GoGradle[3], ...)
> > * Its incremental build support works and only builds things that changed
> > through the use of a build cache. Even without the build cache (or for
> > clean builds), it is much faster.
> > * Apache Infra already has Gradle v4.x installed on the Jenkins machines.
> >
> > Any alternatives that should be considered or should we stick with Apache
> > Maven?
> >
> > 1:
> > https://lists.apache.org/thread.html/25311e0e95be5c49afb168d9b4b4d3
> > 57984c10c39c7b01da8ff3baaf@%3Cdev.beam.apache.org%3E
> > 2: https://github.com/linkedin/pygradle
> > 3: https://github.com/gogradle/gogradle
> >
>


Re: Java pre/post commit test suite breakage

2017-10-23 Thread Ben Chambers
I believe license errors are detected by Apache RAT, which are part of the
release profile. I believe you need to run "mvn clean verify -Prelease" to
enable anything associated with that profile.

On Mon, Oct 23, 2017 at 11:46 AM Valentyn Tymofieiev
 wrote:

> Hi Beam-Dev,
>
> It's been >5 days since the last successful run of a
> beam_PreCommit_Java_MavenInstall build[1]  and >4 days since last
> successful run of beam_PreCommit_Java_MavenInstall[2].
>
> Looking at build logs I see following problems.
>
> 1. After October 17, postcommit builds started to fail with
>
> Failed to execute goal org.apache.rat:apache-rat-plugin:0.12:check
> (default) on project beam-parent: Too many files with unapproved license: 1
> See RAT report in: /home/jenkins/jenkins-slave/workspace/beam_PostCommit_
> Java_MavenInstall/target/beam-parent-2.3.0-SNAPSHOT.rat
>
> The earliest build that I see this error is Postcommit #5052 [3].
>
> This makes me suspect [4] or [5] as a breaking change, since they change
> pom files.
>
> Questions:
> - Is there a way we can reproduce this failure locally? mvn clean verify
> passes locally for me.
> - Is there a way we can see the See RAT report mentioned in the error log?
>
>
> 2. Prior to onset of #1 Java Precommit builds no longer complete within
> allotted 150 min time. Looking at [6-8] it seems the build makes consistent
> progress, but just does not finish on time. We can also see several recent
> successful builds with execution time very close to time out [9-11].
>
> I'd like to propose to increase time limit for Java precommit test suite
> from 2.5 to 4 hours. 4 hours is long time. I agree that we should
> definitely try to reduce the test execution time, and reduce flakiness.
> However we need the tests at least pass for now. If we write off failed
> test suites as 'flakes' and merge PRs without having a green test signal,
> we will have to spend more time tracing breakages such as #1.
>
> Thoughts?
>
> Thanks,
> Valentyn
>
> [1] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/
> [2] https://builds.apache.org/job/beam_PostCommit_Java_MavenInstall/
> [3] https://builds.apache.org/job/beam_PostCommit_Java_
> MavenInstall/5052/changes
>
> [4] https://github.com/apache/beam/commit/d745cc9d8cc1735d3bc3c67ba3e261
> 7cb7f11a8c
> 
> [5] https://github.com/apache/beam/commit/0d8ab6cbbc762dd9f9be1b3e9a26b6
> c9d0bb6dc3
> 
>
> [6] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15222/
> [7] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15195/
> [8] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15220/
>
> [9] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15009/
> [10] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15068/
> [11] https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/15016/
>


Re: TikaIO Refactoring

2017-10-04 Thread Ben Chambers
It looks like ReadableFile#open does currently decompress the stream, but
it seems like we could add a ReadableFile#openRaw(...) or something like
that which didn't implicitly decompress. Then libraries such as Tika which
want the *actual* file content could use that method. Would that address
your concerns?

https://github.com/apache/beam/blob/393e5631054a81ae1fdcd304f81cc68cf53d3422/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L131

On Wed, Oct 4, 2017 at 2:42 PM Sergey Beryozkin 
wrote:

> Wait, but what about Tika doing checks like Zip bombs, etc ? Tika is
> expected to decompress itself, while ReadableFile has the content
> decompressed.
>
> The other point is that Tika reports the names of the zipped files too,
> in the content, as you can see from TikaIOTest#readZippedPdfFile.
>
> Can we assume that if Metadata does not point to the local file then it
> can be opened as a URL stream ? The same issue affects TikaConfig, so
> I'd rather have a solution which will work for MatchResult.Metadata and
> TikaConfig
>
> Thanks, Sergey
> On 04/10/17 22:02, Sergey Beryozkin wrote:
> > Good point...
> >
> > Sergey
> >
> > On 04/10/17 18:24, Eugene Kirpichov wrote:
> >> Can TikaInputStream consume a regular InputStream? If so, you can
> >> apply it
> >> to Channels.newInputStream(channel). If not, applying it to the filename
> >> extracted from Metadata won't work either because it can point to a file
> >> that's not on the local disk.
> >>
> >> On Wed, Oct 4, 2017, 10:08 AM Sergey Beryozkin 
> >> wrote:
> >>
> >>> I'm starting moving toward
> >>>
> >>> class TikaIO {
> >>> public static ParseAllToString parseAllToString() {..}
> >>> class ParseAllToString extends
> PTransform,
> >>> PCollection> {
> >>>   ...configuration properties...
> >>>   expand {
> >>> return input.apply(ParDo.of(new ParseToStringFn))
> >>>   }
> >>>   class ParseToStringFn extends DoFn<...> {...}
> >>> }
> >>> }
> >>>
> >>> as suggested by Eugene
> >>>
> >>> The initial migration seems to work fine, except that ReadableFile and
> >>> in particular, ReadableByteChannel can not be consumed by
> >>> TikaInputStream yet (I'll open an enhancement request), besides, it's
> >>> better let Tika to unzip if needed given that a lot of effort went in
> >>> Tika into detecting zip security issues...
> >>>
> >>> So I'm typing it as
> >>>
> >>> class ParseAllToString extends
> >>> PTransform, PCollection>
> >>>
> >>> Cheers, Sergey
> >>>
> >>> On 02/10/17 12:03, Sergey Beryozkin wrote:
>  Thanks for the review, please see the last comment:
> 
>  https://github.com/apache/beam/pull/3835#issuecomment-333502388
> 
>  (sorry for the possible duplication - but I'm not sure that GitHub
> will
>  propagate it - as I can not see a comment there that I left on
>  Saturday).
> 
>  Cheers, Sergey
>  On 29/09/17 10:21, Sergey Beryozkin wrote:
> > Hi
> > On 28/09/17 17:09, Eugene Kirpichov wrote:
> >> Hi! Glad the refactoring is happening, thanks!
> >
> > Thanks for getting me focused on having TikaIO supporting the simpler
> > (and practical) cases first :-)
> >> It was auto-assigned to Reuven as formal owner of the component. I
> >> reassigned it to you.
> > OK, thanks...
> >>
> >> On Thu, Sep 28, 2017 at 7:57 AM Sergey Beryozkin
> >>  
> >> wrote:
> >>
> >>> Hi
> >>>
> >>> I started looking at
> >>> https://issues.apache.org/jira/browse/BEAM-2994
> >>>
> >>> and pushed some initial code to my tikaio branch introducing
> >>> ParseResult
> >>> and updating the tests but keeping the BounderSource/Reader,
> >>> dropping
> >>> the asynchronous parsing code, and few other bits.
> >>>
> >>> Just noticed it is assigned to Reuven - does it mean Reuven is
> >>> looking
> >>> into it too or was it auto-assigned ?
> >>>
> >>> I don't mind, would it make sense for me to do an 'interim' PR on
> >>> what've done so far before completely removing BoundedSource/Reader
> >>> based code ?
> >>>
> >> Yes :)
> >>
> > I did commit yesterday to my branch, and it made its way to the
> > pending PR (which I forgot about) where I only tweaked a couple of
> doc
> > typos, so I renamed that PR:
> >
> > https://github.com/apache/beam/pull/3835
> >
> > (The build failures are apparently due to the build timeouts)
> >
> > As I mentioned, in this PR I updated the existing TikaIO test to work
> > with ParseResult, at the moment a file location as its property. Only
> > a file name can easily be saved, I thought it might be important
> where
> > on the network the file is - may be copy it afterwards if needed,
> etc.
> > I'd also have no problems with having it typed as a K key, was only
> > trying to make it a bit simpler at the start.
> >
> > I'll deal with the new configurations after a switch.

Re: TikaIO concerns

2017-09-22 Thread Ben Chambers
BigQueryIO allows a side-output for elements that failed to be inserted
when using the Streaming BigQuery sink:

https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StreamingWriteTables.java#L92

This follows the pattern of a DoFn with multiple outputs, as described here
https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow

So, the DoFn that runs the Tika code could be configured in terms of how
different failures should be handled, with the option of just outputting
them to a different PCollection that is then processed in some other way.

On Fri, Sep 22, 2017 at 10:18 AM Allison, Timothy B. 
wrote:

> Do tell...
>
> Interesting.  Any pointers?
>
> -Original Message-----
> From: Ben Chambers [mailto:bchamb...@google.com.INVALID]
> Sent: Friday, September 22, 2017 12:50 PM
> To: dev@beam.apache.org
> Cc: d...@tika.apache.org
> Subject: Re: TikaIO concerns
>
> Regarding specifically elements that are failing -- I believe some other
> IO has used the concept of a "Dead Letter" side-output,, where documents
> that failed to process are side-output so the user can handle them
> appropriately.
>
> On Fri, Sep 22, 2017 at 9:47 AM Eugene Kirpichov
>  wrote:
>
> > Hi Tim,
> > From what you're saying it sounds like the Tika library has a big
> > problem with crashes and freezes, and when applying it at scale (eg.
> > in the context of Beam) requires explicitly addressing this problem,
> > eg. accepting the fact that in many realistic applications some
> > documents will just need to be skipped because they are unprocessable?
> > This would be first example of a Beam IO that has this concern, so I'd
> > like to confirm that my understanding is correct.
> >
> > On Fri, Sep 22, 2017 at 9:34 AM Allison, Timothy B.
> > 
> > wrote:
> >
> > > Reuven,
> > >
> > > Thank you!  This suggests to me that it is a good idea to integrate
> > > Tika with Beam so that people don't have to 1) (re)discover the need
> > > to make their wrappers robust and then 2) have to reinvent these
> > > wheels for robustness.
> > >
> > > For kicks, see William Palmer's post on his toe-stubbing efforts
> > > with Hadoop [1].  He and other Tika users independently have wound
> > > up carrying out exactly your recommendation for 1) below.
> > >
> > > We have a MockParser that you can get to simulate regular
> > > exceptions,
> > OOMs
> > > and permanent hangs by asking Tika to parse a  xml [2].
> > >
> > > > However if processing the document causes the process to crash,
> > > > then it
> > > will be retried.
> > > Any ideas on how to get around this?
> > >
> > > Thank you again.
> > >
> > > Cheers,
> > >
> > >Tim
> > >
> > > [1]
> > >
> > http://openpreservation.org/blog/2014/03/21/tika-ride-characterising-w
> > eb-content-nanite/
> > > [2]
> > >
> > https://github.com/apache/tika/blob/master/tika-parsers/src/test/resou
> > rces/test-documents/mock/example.xml
> > >
> >
>


Re: TikaIO concerns

2017-09-22 Thread Ben Chambers
Regarding specifically elements that are failing -- I believe some other IO
has used the concept of a "Dead Letter" side-output,, where documents that
failed to process are side-output so the user can handle them appropriately.

On Fri, Sep 22, 2017 at 9:47 AM Eugene Kirpichov
 wrote:

> Hi Tim,
> From what you're saying it sounds like the Tika library has a big problem
> with crashes and freezes, and when applying it at scale (eg. in the context
> of Beam) requires explicitly addressing this problem, eg. accepting the
> fact that in many realistic applications some documents will just need to
> be skipped because they are unprocessable? This would be first example of a
> Beam IO that has this concern, so I'd like to confirm that my understanding
> is correct.
>
> On Fri, Sep 22, 2017 at 9:34 AM Allison, Timothy B. 
> wrote:
>
> > Reuven,
> >
> > Thank you!  This suggests to me that it is a good idea to integrate Tika
> > with Beam so that people don't have to 1) (re)discover the need to make
> > their wrappers robust and then 2) have to reinvent these wheels for
> > robustness.
> >
> > For kicks, see William Palmer's post on his toe-stubbing efforts with
> > Hadoop [1].  He and other Tika users independently have wound up carrying
> > out exactly your recommendation for 1) below.
> >
> > We have a MockParser that you can get to simulate regular exceptions,
> OOMs
> > and permanent hangs by asking Tika to parse a  xml [2].
> >
> > > However if processing the document causes the process to crash, then it
> > will be retried.
> > Any ideas on how to get around this?
> >
> > Thank you again.
> >
> > Cheers,
> >
> >Tim
> >
> > [1]
> >
> http://openpreservation.org/blog/2014/03/21/tika-ride-characterising-web-content-nanite/
> > [2]
> >
> https://github.com/apache/tika/blob/master/tika-parsers/src/test/resources/test-documents/mock/example.xml
> >
>


Re: Proposal: Unbreak Beam Python 2.1.0 with 2.1.1 bugfix release

2017-09-19 Thread Ben Chambers
Any elaboration or jira issues describing what is broken? Any proposal for
what changes need to happen to fix it?

On Tue, Sep 19, 2017, 5:49 PM Chamikara Jayalath 
wrote:

> +1 for cutting 2.1.1 for Python SDK only.
>
> Thanks,
> Cham
>
> On Tue, Sep 19, 2017 at 5:43 PM Robert Bradshaw
> 
> wrote:
>
> > +1. Right now anyone who follows our quickstart instructions or
> > otherwise installs the latest release of apache_beam is broken.
> >
> > On Tue, Sep 19, 2017 at 2:05 PM, Charles Chen 
> > wrote:
> > > The latest version (2.1.0) of Beam Python (
> > > https://pypi.python.org/pypi/apache-beam) is broken due to a change in
> > the
> > > "six" dependency (BEAM-2964
> > > ).  For instance,
> > > installing "apache-beam" in a clean environment and running "python -m
> > > apache_beam.examples.wordcount" results in a failure.  This issue is
> > fixed
> > > at head with Robert's recent PR (
> > https://github.com/apache/beam/pull/3865).
> > >
> > > I propose to cherry-pick this change on top of the 2.1.0 release branch
> > (to
> > > form a new 2.1.1 release branch) and call a vote to release version
> 2.1.1
> > > only for Beam Python.
> > >
> > > Alternatively, to preserve version alignment we could also re-release
> > Beam
> > > Java 2.1.1 with the same code as 2.1.0 modulo the version bump.
> > Thoughts?
> > >
> > > Best,
> > > Charles
> >
>


Re: Using side inputs in any user code via thread-local side input accessor

2017-09-13 Thread Ben Chambers
One possible issue with this is that updating a thread local is likely to
be much more expensive than passing an additional argument. Also, not all
code called from within the DoFn will necessarily be in the same thread
(eg., sometimes we create a pool of threads for doing work). It may be
*more* confusing for this to sometimes work magically and sometimes fail
horribly. Also, requiring the PCollectionView to be passed to user code
that accesses it is nice because it makes *very clear* that the side input
needs to be provided from the DoFn to that particular utility. If it is
accessed via "spooky action at a distance" we lose that piece of "free"
documentation, which may lead to extensive misuse of these utility methods.

On Wed, Sep 6, 2017 at 11:10 AM Eugene Kirpichov
 wrote:

> Hi,
>
> On Wed, Sep 6, 2017 at 10:55 AM Kenneth Knowles 
> wrote:
>
> > On Wed, Sep 6, 2017 at 8:15 AM, Eugene Kirpichov <
> > kirpic...@google.com.invalid> wrote:
> >
> > >
> > > The differences are:
> > > - The proposal in the doc allows wiring different side inputs to the
> same
> > > Supplier, but I'm not convinced that this is important - you can just
> as
> > > easily call the constructor of your DoFn passing different
> > > PCollectionView's for it to capture.
> > >
> >
> > I disagree with this bit about it being "just as easy". Passing the
> needed
> > PCollectionViews to your constructor (or even having a constructor) is a
> > pain. Every time I have to do it, it adds a ton of boilerplate that feels
> > like pure noise. To make a DoFn reusable it must be made into a named
> class
> > with a constructor, versus inlined with no constructor.
>
> Hm, why? You can have the DoFn be an anonymous class capturing the
> PCollectionView into a @SideInput field as a closure.
>
>
> > A generous analogy
> > is is that it is "just" manual closure conversion/currying, changing
> > f(side, main) to f(side)(main). But in practice in Beam the second one
> has
> > much more boilerplate.
> >
> > Also, Beam is worse. We present the user with higher-order functions,
> which
> > is where the actual annoyance comes in. When you want to pardo(f) you
> have
> > to write pardo(f(side))(side, main). Your proposal is to support
> > pardo(f(side))(main) and mine is to support pardo(f)(side, main). I still
> > propose that we support both (as they get implemented). If you buy in to
> my
> > analogy, then there's decades of precedent and the burden of proof falls
> > heavily on whoever doesn't want to support both.
> >
> I see your point. I think the proposal is compatible with what you're
> suggesting too - in DoFn we could have @SideInput *parameters* of type
> PCollectionView, with the same semantics as a field.
>
>
> >
> > - My proposal allows getting rid of .withSideInputs() entirely, because
> the
> > > DoFn captures the PCollectionView so you don't need to specify it
> > > explicitly for wiring.
> > >
> >
> > I've decided to change to full +1 (whatever that means compared to 0.75
> :-)
> > to adding support for @SideInput fields, because the benefits outweigh
> this
> > failure mode:
> >
> > new DoFn {
> >   // forgot the annotation
> >   private final PCollectionView whatever;
> >
> >   @ProcessElement public void process(...) {
> > whatever.get(); // crash during execution
> >   }
> > }
> >
> > But ideas to mitigate that would be cool.
>
> Hm, can't think of anything less hacky than "prohibit having fields of type
> PCollectionView that are not public, final, and annotated with @SideInput"
> - not sure we'd want to go down this road. I suppose a good error message
> in .get() would be sufficient, saying "Did you forget to specify a
> requirement for this side input via .withSideInputs() or by annotating the
> field as @SideInput" or something like that.
>
> >
>
>
> > Kenn
> >
> >
> > >
> > > On Wed, Sep 6, 2017 at 6:03 AM Lukasz Cwik 
> > > wrote:
> > >
> > > > My concern with the proposal is not the specifics of how it will work
> > and
> > > > more about it being yet another way on how our API is to be used even
> > > > though we have a proposal [1] of an API style we were working towards
> > in
> > > > Java and Python. I would rather re-open that discussion now about
> what
> > we
> > > > want that API to look like for our major features and work towards
> > > > consistency (or not if there is a strong argument as to why some
> > feature
> > > > should have a different style).
> > > >
> > > > 1: https://s.apache.org/a-new-dofn
> > > >
> > > > On Wed, Sep 6, 2017 at 12:22 AM, Kenneth Knowles
> >  > > >
> > > > wrote:
> > > >
> > > > > +0.75 because I'd like to bring up invalid pipelines.
> > > > >
> > > > > I had proposed side inputs as parameters to DoFn in
> > > > > https://s.apache.org/a-new-dofn (specifically at [1]) so the only
> > > place
> > > > > they are specified is in the graph construction, making the DoFn
> more
> > > > > reusable and errors impossible. I've actually been noodling my way
> > > > towards
> > > > > this in a br

Re: Adding Serializable implements to SDK classes like WindowedValue etc...

2017-08-24 Thread Ben Chambers
If the user classes are not Serializable, how does adding Serializable to
the WindowedValue help? The user class, which is stored in a field in the
WindowedValue will still be non-serializable, and thus cause problems.

On Thu, Aug 24, 2017 at 11:59 AM Kobi Salant  wrote:

> right, if it is not kryo or java serializable then we should allow user to
> set caching off
>
> 2017-08-24 21:55 GMT+03:00 Reuven Lax :
>
> > However user classes are not guaranteed to be java serializable either.
> >
> > On Thu, Aug 24, 2017 at 11:31 AM, Kobi Salant 
> > wrote:
> >
> > > Thank you for your replies,
> > >
> > > Lukasz, The kryo does not fail on the WindowedValue but on the user
> class
> > > which in some cases is not kryo-serializable usually when there is no
> > > default constructor. Switching to java serialization does fail on
> > > WindowedValue
> > > as it is not defined serializable
> > >
> > > Thomas, We thought about writing a custom serializer but it cannot work
> > > without knowing the exact coders of each RDD, it is similar to the
> > > discussion that was in the past is it mandatory for the user to set
> > coders
> > > for each PCollection or not. This suggestion is like asking Beam to
> auto
> > > detect coders for all Pcollections.
> > >
> > > Ben, Spark uses java serialization to send closures and it will not
> work
> > if
> > > the cluster contains different JVMs that are not compatible. We agree
> > with
> > > you all that it will be more efficient to use coders for caching but
> that
> > > means
> > > a major rewrite of the Spark runner and changing almost all
> > > methods signatures and internal RDDs from a specific type to byte[].
> > >
> > > As i said before this is fallback for users who have non serializable
> > user
> > > classes, even today if an RDD is cached with kryo it doesn't pass the
> > coder
> > > so we are not worsening the situation but enabling users to use the
> Spark
> > > runner.
> > >
> > > Thanks
> > > Kobi
> > >
> > >
> > >
> > > בתאריך 24 באוג' 2017 20:37,‏ "Thomas Weise"  כתב:
> > >
> > > > Would a custom Kryo serializer that uses the coders to perform
> > > > serialization help?
> > > >
> > > > There are various ways Kryo let's you annotate such serializer
> without
> > > full
> > > > surgery, including @Bind on the field level or at class level.
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > > On Thu, Aug 24, 2017 at 9:57 AM, Lukasz Cwik
>  > >
> > > > wrote:
> > > >
> > > > > How does Kryo's FieldSerializer fail on WindowedValue/PaneInfo, it
> > > seems
> > > > > like those are pretty simple types.
> > > > >
> > > > > Also, I don't see why they can't be tagged with Serializable but I
> > > think
> > > > > the original reasoning was that coders should be used to guarantee
> a
> > > > stable
> > > > > representation across JVM versions.
> > > > >
> > > > > On Thu, Aug 24, 2017 at 5:38 AM, Kobi Salant <
> kobi.sal...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi All,
> > > > > >
> > > > > > I am working on Spark runner issue
> > > > > > https://issues.apache.org/jira/browse/BEAM-2669
> > > > > > "Kryo serialization exception when DStreams containing
> > > > > > non-Kryo-serializable data are cached"
> > > > > >
> > > > > > Currently, Spark runner enforce the kryo serializer and in
> > shuffling
> > > > > > scenarios we always uses coders to transfer bytes over the
> network.
> > > But
> > > > > > Spark can also serialize data for caching/persisting and
> currently
> > it
> > > > > uses
> > > > > > kryo for this.
> > > > > >
> > > > > > Today, when the user uses a class which is not kryo serializable
> > the
> > > > > > caching fails and we thought to open the option to use java
> > > > serialization
> > > > > > as a fallback.
> > > > > >
> > > > > > Our RDDs/DStreams behind the PCollections are usually typed
> > > > > > RDD/DStream>
> > > > > > and when Spark tries to java serialize them for caching purposes
> it
> > > > fails
> > > > > > on WindowedValue not being java serializable.
> > > > > >
> > > > > > Is there any objection to add Serializable implements to SDK
> > classes
> > > > like
> > > > > > WindowedValue, PaneInfo and others?
> > > > > >
> > > > > > Again, i want to emphasise that coders are a big part of the
> Spark
> > > > runner
> > > > > > code and we do use them whenever a shuffle is expected. Changing
> > the
> > > > > types
> > > > > > of the  RDDs/DStreams to byte[] will make the code pretty
> > unreadable
> > > > and
> > > > > > will weaken type safety checks.
> > > > > >
> > > > > > Thanks
> > > > > > Kobi
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: Adding Serializable implements to SDK classes like WindowedValue etc...

2017-08-24 Thread Ben Chambers
I think as Luke pointed out, one problem with Serializable is that it isn't
guaranteed to be stable across different JVM versions.

The only drawback I see here from the user perspective is that if I have
already written a custom coder to handle my type in some reasonable way,
this is going to ignore that and try to use a serializer. If my coder had
any special logic (not serializing some fields, using a more efficient
representation either for the serialized object or the de-serialized form)
that won't be propagated etc.

I think this could lead to confusion. Imagine someone writes a Coder that
attempts to reuse decoded values (not unreasonable for things like hourly
windows, where you may have many things referencing the same hour). A user
may be surprised to find that there are many instances of each hour because
we have chosen to use a serializer rather than the Coder they instructed us
to use.

On Thu, Aug 24, 2017 at 9:57 AM Lukasz Cwik 
wrote:

> How does Kryo's FieldSerializer fail on WindowedValue/PaneInfo, it seems
> like those are pretty simple types.
>
> Also, I don't see why they can't be tagged with Serializable but I think
> the original reasoning was that coders should be used to guarantee a stable
> representation across JVM versions.
>
> On Thu, Aug 24, 2017 at 5:38 AM, Kobi Salant 
> wrote:
>
> > Hi All,
> >
> > I am working on Spark runner issue
> > https://issues.apache.org/jira/browse/BEAM-2669
> > "Kryo serialization exception when DStreams containing
> > non-Kryo-serializable data are cached"
> >
> > Currently, Spark runner enforce the kryo serializer and in shuffling
> > scenarios we always uses coders to transfer bytes over the network. But
> > Spark can also serialize data for caching/persisting and currently it
> uses
> > kryo for this.
> >
> > Today, when the user uses a class which is not kryo serializable the
> > caching fails and we thought to open the option to use java serialization
> > as a fallback.
> >
> > Our RDDs/DStreams behind the PCollections are usually typed
> > RDD/DStream>
> > and when Spark tries to java serialize them for caching purposes it fails
> > on WindowedValue not being java serializable.
> >
> > Is there any objection to add Serializable implements to SDK classes like
> > WindowedValue, PaneInfo and others?
> >
> > Again, i want to emphasise that coders are a big part of the Spark runner
> > code and we do use them whenever a shuffle is expected. Changing the
> types
> > of the  RDDs/DStreams to byte[] will make the code pretty unreadable and
> > will weaken type safety checks.
> >
> > Thanks
> > Kobi
> >
>


Re: [PROPOSAL] "Requires deterministic input"

2017-08-10 Thread Ben Chambers
I think it only makes sense in places where a user might reasonable require
stable input to ensure idempotency of side-effects. It also only makes
sense in places where a runner could reasonably provide such a guarantee.

A given Combine is unlikely to have side effects so it is less likely to
benefit from stability of the input. Further, the reason writing a Combine
is desirable is because its execution can be split up and moved to the
mapper-side (before the GroupByKey). But this division is inherently
non-deterministic, and so it seems unlikely to benefit from stability. And
many cases where I could see wanting side-effects would end up in
extractOutput, for which there is an easy (arguably better) solution --
have extractOutput return the accumulators and do the side-effects in a
DoFn afterwards.

For composites, it is a bit trickier. I could see a case for supporting it
on composites, but it would need to make it very clear that it only
affected the input to the composite. If any of the operations within the
composite were non-deterministic, then the outputs of that could be
unstable, leading to instability in later parts of the composite. Further,
it doesn't seem to offer much. The composite itself doesn't perform
side-effects, so there is no benefit to having the annotation there --
instead, we allow the annotation to be put where it is relevant and
important -- on the DoFn's that actually have side-effects that require
stability.

On Thu, Aug 10, 2017 at 9:23 AM Reuven Lax  wrote:

> I don't think it really makes sense to to do this on Combine. And I agree
> with you, it doesn't make sense on composites either.
>
> On Thu, Aug 10, 2017 at 9:19 AM, Scott Wegner 
> wrote:
>
> > Does requires-stable-input only apply to ParDo transforms?
> >
> > I don't think it would make sense to annotate to composite, because
> > checkpointing should happen as close to the side-effecting operation as
> > possible, since upstream transforms within a composite could introduce
> > non-determinism. So it's the primitive transform that should own the
> > requirement.
> >
> > Are there other primitives that should be annotated? 'Combine' is
> > interesting because it optimized in Dataflow (and perhaps other runners)
> to
> > partially apply before a GroupByKey.
> >
> > On Thu, Aug 10, 2017 at 9:01 AM Tyler Akidau  >
> > wrote:
> >
> > > +1 to the annotation idea, and to having it on processTimer.
> > >
> > > -Tyler
> > >
> > > On Thu, Aug 10, 2017 at 2:15 AM Aljoscha Krettek 
> > > wrote:
> > >
> > > > +1 to the annotation approach. I outlined how implementing this would
> > > work
> > > > in the Flink runner in the Thread about the exactly-once Kafka Sink.
> > > >
> > > > > On 9. Aug 2017, at 23:03, Reuven Lax 
> > wrote:
> > > > >
> > > > > Yes - I don't think we should try and make any deterministic
> > guarantees
> > > > > about what is in a bundle. Stability guarantees are per element
> only.
> > > > >
> > > > > On Wed, Aug 9, 2017 at 1:30 PM, Thomas Groh
>  > >
> > > > > wrote:
> > > > >
> > > > >> +1 to the annotation-on-ProcessElement approach. ProcessElement is
> > the
> > > > >> minimum implementation requirement of a DoFn, and should be where
> > the
> > > > >> processing logic which depends on characteristics of the inputs
> lie.
> > > > It's a
> > > > >> good way of signalling the requirements of the Fn, and letting the
> > > > runner
> > > > >> decide.
> > > > >>
> > > > >> I have a minor concern that this may not work as expected for
> users
> > > that
> > > > >> try to batch remote calls in `FinishBundle` - we should make sure
> we
> > > > >> document that it is explicitly the input elements that will be
> > > replayed,
> > > > >> and bundles and other operational are still arbitrary.
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Wed, Aug 9, 2017 at 10:37 AM, Reuven Lax
> >  > > >
> > > > >> wrote:
> > > > >>
> > > > >>> I think deterministic here means deterministically replayable.
> i.e.
> > > no
> > > > >>> matter how many times the element is retried, it will always be
> the
> > > > same.
> > > > >>>
> > > > >>> I 

Re: [PROPOSAL] "Requires deterministic input"

2017-08-09 Thread Ben Chambers
I strongly agree with this proposal. I think moving away from "just insert
a GroupByKey for one of the 3 different reasons you may want it" towards
APIs that allow code to express the requirements they have and the runner
to choose the best way to meet this is a major step forwards in terms of
portability.

I think "deterministic" may be misleading. The actual contents of the
collection aren't deterministic if upstream computations aren't. The
property we really need is that once an input may have been observed by the
side-effecting code it will never be observed with a different value.

I would propose something RequiresStableInput, to indicate that the input
must be stable as observed by the function. I could also see something
hinting at the fact we don't recompute the input, such as
RequiresMemoizedInput or RequiresNoRecomputation.

-- Ben

P.S For anyone interested other uses of GroupByKey that we may want to
discuss APIs for would be preventing retry across steps (eg., preventing
fusion) and redistributing inputs across workers.

On Wed, Aug 9, 2017 at 9:53 AM Kenneth Knowles 
wrote:

> This came up again, so I wanted to push it along by proposing a specific
> API for Java that could have a derived API in Python. I am writing this
> quickly to get something out there, so I welcome suggestions for revision.
>
> Today a DoFn has a @ProcessElement annotated method with various automated
> parameters, but most fundamentally this:
>
> @ProcessElement
> public void process(ProcessContext ctx) {
>   ctx.element() // to access the current input element
>   ctx.output(something) // to write to default output collection
>   ctx.output(tag, something) // to write to other output collections
> }
>
> For some time, we have hoped to unpack the context - it is a
> backwards-compatibility pattern made obsolete by the new DoFn design. So
> instead it would look like this:
>
> @ProcessElement
> public void process(Element element, MainOutput mainOutput, ...) {
>   element.get() // to access the current input element
>   mainOutput.output(something) // to write to the default output collection
>   other.output(something) // to write to other output collection
> }
>
> I've deliberately left out the undecided syntax for side outputs. But it
> would be nice for the tag to be built in to the parameter so it doesn't
> have to be used when calling output().
>
> One way to enhance this to deterministic input would just be this:
>
> @ProcessElement
> @RequiresDeterministicInput
> public void process(Element element, MainOutput mainOutput, ...) {
>   element.get() // to access the current input element
>   mainOutput.output(something) // to write to the default output collection
>   other.output(something) // to write to other output collection
> }
>
> There are really a lot of places where we could put an annotation or change
> a type to indicate that the input PCollection should be
> well-defined/deterministically-replayable. I don't have a really strong
> opinion.
>
> Kenn
>
> On Tue, Mar 21, 2017 at 4:53 PM, Ben Chambers  >
> wrote:
>
> > Allowing an annotation on DoFn's that produce deterministic output could
> be
> > added in the future, but doesn't seem like a great option.
> >
> > 1. It is a correctness issue to assume a DoFn is deterministic and be
> > wrong, so we would need to assume all transform outputs are
> > non-deterministic unless annotated. Getting this correct is difficult
> (for
> > example, GBK is surprisingly non-deterministic except in specific cases).
> >
> > 2. It is unlikely to be a major performance improvement, given that any
> > non-deterministic transform prior to a sink (which are most likely to
> > require deterministic input) will cause additional work to be needed.
> >
> > Based on this, it seems like the risk of allowing an annotation is high
> > while the potential for performance improvements is low. The current
> > proposal (not allowing an annotation) makes sense for now, until we can
> > demonstrate that the impact on performance is high in cases that could be
> > avoided with an annotation (in real-world use).
> >
> > -- Ben
> >
> > On Tue, Mar 21, 2017 at 2:05 PM vikas rk  wrote:
> >
> > +1 for the general idea of runners handling it over hard-coded
> > implementation strategy.
> >
> > For the Write transform I believe you are talking about ApplyShardingKey
> > <
> > https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4
> > c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/
> > io/Write.java#L304
> > >
> > which
> > introduces non determinist

Re: [DISCUSS] Beam pipeline logical and physical DAGs visualization.

2017-08-04 Thread Ben Chambers
I think with GraphViz you can put a label on the "subgraph" that represents
each composite. This would allow each leaf transform to be just the name of
that leaf within the composite, rather than full name. Is that what you
were suggesting Robert?

On Thu, Aug 3, 2017 at 10:06 PM Robert Bradshaw 
wrote:

> Nice.
>
> In terms of shared data structures, we have
>
> https://github.com/apache/beam/blob/master/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
> . Presumably a utility that converts this to a dot file would be quite
> useful.
>
> It might be interesting to experiment with different ways of handling
> the nesting. For example, the nodes inside a composite transform need
> not repeat their common prefix, which could make things more compact.
>
> On Thu, Aug 3, 2017 at 9:25 PM, Ahmet Altay 
> wrote:
> > +1, this looks great and it will be very useful for users to understand
> > their pipelines.
> >
> > On Thu, Aug 3, 2017 at 8:25 PM, Pei HE  wrote:
> >
> >> Hi all,
> >> While working on JStorm and MapReduce runners, I found that it is very
> >> helpful to understand Beam pipelines by visualizing them.
> >>
> >> Logical graph:
> >> https://drive.google.com/file/d/0B6iZ7iRh-LOYc0dUS0Rwb2tvWGM/view?usp=
> >> sharing
> >>
> >> Physical graph:
> >> https://drive.google.com/file/d/0B6iZ7iRh-LOYbDFWeDlCcDhnQmc/view?usp=
> >> sharing
> >>
> >> I think we can visualize Beam logical DAG in runner-core. It should
> also be
> >> easy to visualize the physical DAG in each runners. (Maybe we can define
> >> some shared data structures to make it more automatic, and even support
> >> visualizing them in Apex/Flink/Spark/Gearpump UIs).
> >>
> >> I have a commit for MapReduce runner in here (<200 lines). And, this
> commit
> >> generates dotfiles for logical and physical DAGs.
> >>
> >> https://github.com/peihe/incubator-beam/commit/
> >> bb3349e10c0cfacd81b610880ddfec030fedf34d
> >>
> >> Looking forward to ideas and feedbacks.
> >> --
> >> Pei
> >>
>


Re: How to test a transform against an inaccessible ValueProvider?

2017-07-19 Thread Ben Chambers
I also reported something similar to this as
https://issues.apache.org/jira/browse/BEAM-2577. That issue was reported
because we don't have any tests that use a runner and attempt to pass
ValueProviders in. This means that we've found bugs such as
NestedValueProviders used with non-serializable anonymous functions.

One solution seems be to use that pattern:

1. Create an extension of PipelineOptions with some ValueProviders
2. Use that in your test pipeline
3. allow TestPipeline.run() to take additional arguments to provide at
template execution time to populate those value providers

On Wed, Jul 19, 2017 at 12:03 PM Eugene Kirpichov
 wrote:

> Hi,
>
> Just filed JIRA https://issues.apache.org/jira/browse/BEAM-2644
>
> Many transforms that take ValueProvider's have different codepaths for when
> the provider is accessible or not. However, as far as I can tell, there is
> no good way to construct a pipeline with PipelineOptions containing an
> inaccessible ValueProvider, and then test how it would run as a template
> invocation with an actual value supplied.
>
> The only way I could come up with is mimicking
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ValueProviderTest.java#L202
> , which is very ugly.
>
> Am I missing something? Is there already a good way to do this?
>


Re: Proposal and plan: new TextIO features based on SDF

2017-07-12 Thread Ben Chambers
Regarding changing the coder -- keep in mind that there may be persisted
state somewhere, so we can't just change the coder once this is used.

If the processing of scanning for modified and new files reported the
last-modified-time, could we use that and have the SDF report KV with the last-modified-time as the timestamp? If we could do
that, and there was a "watermark" that tracked the latest
last-modified-time that we have processed, then we can use per-key state to
store how far a given filename has been processed, but set an event time
timer to go off when the watermark indicates all files have been processed
up to that point. This would allow the state to be garbage collected.

On Wed, Jul 12, 2017 at 7:50 AM Reuven Lax  wrote:

> Yes, you still need SDF to do the root expansion. However it means that the
> state storage is now distributed.
>
> Garbage collection might be trickier with Distinct.
>
> On Tue, Jul 11, 2017 at 10:19 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > Yes, I thought of this, but:
> > - The distinct transform needs to apply per input (probably easy)
> > - You still need an SDF to run the set expansion repeatedly
> > - It's not clear when to terminate the repeated expansion in this
> > implementation
> >
> > On Tue, Jul 11, 2017 at 10:14 PM Reuven Lax 
> > wrote:
> >
> > > As a thought experiment: could this be done by expanding the set into a
> > > PCollection and running it through a Distinct (in the global window,
> > > trigger every element) transform?
> > >
> > > On Tue, Jul 11, 2017 at 9:48 PM, Eugene Kirpichov <
> > > kirpic...@google.com.invalid> wrote:
> > >
> > > > In the current version, the transform is intended to watch a set that
> > is
> > > > continuously growing; do you mean a GCS bucket that eventually
> contains
> > > > more files than can fit in a state tag?
> > > >
> > > > I agree that this will eventually become an issue; I can see a couple
> > of
> > > > solutions:
> > > > - I suspect many such sets are highly compressible, so we can use a
> > coder
> > > > that compresses things and get some headroom.
> > > > - When an element disappears from a set, we can remove it from the
> > state
> > > > (without emitting anything into the transform's output - just for GC
> > > > purposes). Of course this assumes that elements actually disappear
> from
> > > the
> > > > set (e.g. get removed from the GCS bucket).
> > > > - There might be a way to shard the set using a GBK. I'm not quite
> sure
> > > how
> > > > it would look in the transform, in particular how the termination
> > > condition
> > > > would look like - because polling would need to happen before the
> GBK,
> > > and
> > > > termination conditions such as "no new elements observed" depend on
> > > > information in shards that's after the GBK.
> > > >
> > > > On Tue, Jul 11, 2017 at 9:23 PM Reuven Lax  >
> > > > wrote:
> > > >
> > > > > BTW - I am worried about SDF storing everything in a single tag for
> > > > watch.
> > > > > The problem is that streaming pipeline can run "forever." So
> someone
> > > > > watching a GCS bucket "forever" will eventually crash due to the
> > value
> > > > > getting too large. Is there any reasonable way to garbage collect
> > this
> > > > > state?
> > > > >
> > > > > On Tue, Jul 11, 2017 at 9:08 PM, Eugene Kirpichov <
> > > > > kirpic...@google.com.invalid> wrote:
> > > > >
> > > > > > First PR has been submitted - enjoy TextIO.readAll() which reads
> a
> > > > > > PCollection of filenames!
> > > > > > I've started working on the SDF-based Watch transform
> > > > > > http://s.apache.org/beam-watch-transform, and after that will be
> > > able
> > > > to
> > > > > > implement the incremental features in TextIO.
> > > > > >
> > > > > > On Tue, Jun 27, 2017 at 1:55 PM Eugene Kirpichov <
> > > kirpic...@google.com
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks all. The first PR is out for review:
> > > > > > > https://github.com/apache/beam/pull/3443
> > > > > > > Next work (watching for new files) is in progress, based on
> > > > > > > https://github.com/apache/beam/pull/3360
> > > > > > >
> > > > > > > On Tue, Jun 27, 2017 at 11:22 AM Kenneth Knowles
> > > > >  > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> +1
> > > > > > >>
> > > > > > >> This is a really nice doc and plan.
> > > > > > >>
> > > > > > >> On Tue, Jun 27, 2017 at 1:49 AM, Aljoscha Krettek <
> > > > > aljos...@apache.org>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >> > +1
> > > > > > >> >
> > > > > > >> > This sounds very good and there is a clear implementation
> > path!
> > > > > > >> >
> > > > > > >> > > On 24. Jun 2017, at 20:55, Jean-Baptiste Onofré <
> > > > j...@nanthrax.net>
> > > > > > >> wrote:
> > > > > > >> > >
> > > > > > >> > > Fair enough ;)
> > > > > > >> > >
> > > > > > >> > > Let me review the different Jira and provide some
> feedback.
> > > > > > >> > >
> > > > > > >> > > Regards
> > > > > > >> > > JB
> > > > > > >> > >
> > > > > > >>

Re: [DISCUSS] Bridge beam metrics to underlying runners to support metrics reporters?

2017-06-23 Thread Ben Chambers
I think the distinction between metrics being reported (the API we give
users to create Metrics) from getting metrics out of (currently either
using the PipelineResult to query results or connecting to an external
metric store) is important.

There is an additional distinction to be made in data processing systems --
specifically whether the metric value represents a physical quantity such
as the number of RPCs issued during processing which should be counted
across all attempts at processing elements or a logical value such as the
number of elements processed that should only be aggregated across the
successful attempt at an element.

This distinction requires deeper runner involvement since that is the only
point that knows for sure whether an element has been successfully
processed. It is also the only form of Metrics that are not subject to
potential data loss (over or under counting).

Allowing Metrics to preserve this distinction and each runner to choose
which of committed and attempted Metrics to support (or possibly even both)
is part of the reason why the existing API is the way it is.

On Fri, Jun 23, 2017 at 2:41 AM Ismaël Mejía  wrote:

> That seems like a great idea (improving the current metrics design), I
> suppose there is a tradeoff between complexity and simplicity, and
> when I read the design document I think that some design decisions
> were done for the sake of simplicity, however as dropwizard is the
> 'de-facto' standard for metrics (at least in the java world), then it
> makes sense to align more with it, and that also reminds me that Aviem
> also wanted to add dropwizard's EWMA to the metrics API, so there is
> still some work to do.
>
>
> On Fri, Jun 23, 2017 at 11:00 AM, Cody Innowhere 
> wrote:
> > Hi Ismaël,
> > Yes Distribution is similar to codahale's Histogram without the
> quantiles,
> > and what I meant "adding support of Histogram" might be extending
> > Distribution
> > so that quantiles can be supported.
> > I think in metrics area dropwizard metrics is more or less a standard and
> > many frameworks have direct support for this including Spark, Flink and
> > JStorm
> > (Well I happen to be the developer of JStorm metrics and our internal
> alert
> > engine), and you're right if beam metrics are compatible with dropwizard
> we
> > can surely benefit from it.
> >
> > I've also read the design doc and IMHO it's not easy to support
> > Meter/Histogram (currently Distribution is a bit too simple). I'm
> thinking
> > about adding full
> > support of dropwizard metrics and will come up with a doc later so that
> we
> > can discuss this in detail.
> >
> > On Fri, Jun 23, 2017 at 4:30 PM, Ismaël Mejía  wrote:
> >
> >> Cody not sure if I follow, but isn't Distribution on Beam similar to
> >> codahale/dropwizard's HIstogram (without the quantiles) ?
> >>
> >> Meters are also in the plan but not implemented yet, see the Metrics
> >> design doc:
> >> https://s.apache.org/beam-metrics-api
> >>
> >> If I understand what you want is to have some sort of compatibility
> >> with dropwizard so we can benefit of their sinks ? Is this or I am
> >> misreading it, if so, that would be neat, however the only problem is
> >> the way the aggregation phase passes on Beam because of distribution
> >> vs dropwizard (not sure if they have some implementation that takes
> >> distribution into account).
> >>
> >> Improving metrics is in the agenda and contributions are welcomed
> >> because the API is still evolving and we can have new ideas as part of
> >> it.
> >>
> >> Regards,
> >> Ismaël
> >>
> >>
> >> On Fri, Jun 23, 2017 at 9:29 AM, Cody Innowhere 
> >> wrote:
> >> > Yes I agree with you and sorry for messing them together in this
> >> discussion.
> >> > I just wonder if someone plans to support Meters/Histograms in the
> near
> >> > future. If so, we might need to modify metrics a bit in beam sdk IMHO,
> >> > that's the reason I started this discussion.
> >> >
> >> > On Fri, Jun 23, 2017 at 3:21 PM, Jean-Baptiste Onofré <
> j...@nanthrax.net>
> >> > wrote:
> >> >
> >> >> Hi Codi,
> >> >>
> >> >> I think there are two "big" topics around metrics:
> >> >>
> >> >> - what we collect
> >> >> - where we send the collected data
> >> >>
> >> >> The "generic metric sink" (BEAM-2456) is for the later: we don't
> really
> >> >> change/touch the collected data (or maybe just in case of data
> format)
> >> we
> >> >> send to the sink.
> >> >>
> >> >> The Meters/Histograms is both more the collected data IMHO.
> >> >>
> >> >> Regards
> >> >> JB
> >> >>
> >> >>
> >> >> On 06/23/2017 04:09 AM, Cody Innowhere wrote:
> >> >>
> >> >>> Hi JB,
> >> >>> Glad to hear that.
> >> >>> Still, I'm thinking about adding support of Meters &
> Histograms(maybe
> >> >>> extending Distribution). As the discussion mentions, problem is that
> >> >>> Meter/Histogram
> >> >>> cannot be updated directly in current way because their internal
> data
> >> >>> decays after time. Do you plan to refactor current implementation so
> >>

Re: How to create a custom trigger?

2017-06-07 Thread Ben Chambers
There hasn't been a need for user defined triggers and we've found it is
really hard to create triggers with proper behavior.

Can you elaborate on why you are trying to use triggers to understand
watermarks in this way? It's not clear how this trigger would be useful to
that understanding.

On Wed, Jun 7, 2017, 10:22 AM Shen Li  wrote:

> Hi Lukasz,
>
> Thanks again for the suggestion. Is there any reason for not allowing users
> create custom triggers?
>
> Shen
>
> On Wed, Jun 7, 2017 at 12:13 PM, Lukasz Cwik 
> wrote:
>
> > You should really take a look at TestStream and have runners integrate
> with
> > it instead.
> >
> > There are already several tests which validate TestStream compatible
> > runners to make sure their trigger evaluations are correct.
> >
> > On Wed, Jun 7, 2017 at 10:10 AM, Shen Li  wrote:
> >
> > > Hi Lukasz,
> > >
> > > Thanks for the suggestion. I am trying to test how the runner generates
> > > watermarks. So I would like to have the trigger to fire on every
> > watermark
> > > advancing event.
> > >
> > > Shen
> > >
> > > On Wed, Jun 7, 2017 at 10:49 AM, Lukasz Cwik  >
> > > wrote:
> > >
> > > > Look into the AfterPane#elementCountAtLeast trigger as it seems to be
> > the
> > > > closest to your description. It fires as soon as any data is
> available.
> > > >
> > > > Are you sure you don't want some kind of watermark based trigger with
> > > just
> > > > with a small interval size?
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, Jun 7, 2017 at 8:22 AM, Shen Li  wrote:
> > > >
> > > > > Hi Lukasz,
> > > > >
> > > > > Thanks for your response. Is it possible to implement the following
> > > logic
> > > > > using existing triggers: always fire the trigger on a GlobalWindow
> > > > whenever
> > > > > watermark advances?
> > > > >
> > > > > Shen
> > > > >
> > > > > On Wed, Jun 7, 2017 at 10:05 AM, Lukasz Cwik
> >  > > >
> > > > > wrote:
> > > > >
> > > > > > Users are unable to create custom trigger implementations. If you
> > > tell
> > > > us
> > > > > > what you want your trigger to do, we may be able to suggest an
> > > > > alternative.
> > > > > >
> > > > > > On Wed, Jun 7, 2017 at 7:41 AM, Shen Li 
> > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I created a custom trigger class (XYZ) by extending the
> > > OnceTrigger.
> > > > > > During
> > > > > > > execution, I got this error "Cannot translate trigger class XYZ
> > to
> > > a
> > > > > > > runner-API proto." It seems that the Triggers.ProtoConverter
> > class
> > > > > needs
> > > > > > to
> > > > > > > declare a convertSpecific method for my trigger XYZ. How can I
> > use
> > > my
> > > > > > > custom trigger without modifying Beam's Triggers class?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Shen
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] Source Watermark Metrics

2017-06-06 Thread Ben Chambers
The existing metrics allow a user to report additional values to the
runner. For something like the watermark that the runner already knows
about it doesn't need to fit into the set of metrics. Since the runner
already tracks the low watermark for each operator it can just report that
as it sees fit.

This means it shouldn't matter whether the current metrics tyoes can
express it, since they are for getting user numbers into the runner.

On Sun, Jun 4, 2017, 8:27 PM JingsongLee  wrote:

> I feel reporting the current low watermark for each operator is better
> than just reporting the source watermark when I see Flink 1.3 web frontend.
>
> We want the smallest watermark in all splits.  But Some runners, like
> FlinkRunner, don't have a way to get the global smallest watermark,  and
> the metric's type(Counter, Guage, Distribution) can not express it.
>
> Best,JingsongLee
> --
> From:Ben Chambers 
> Time:2017 Jun 2 (Fri) 21:46
> To:dev ; JingsongLee 
> Cc:Aviem Zur ; Ben Chambers
> 
> Subject:Re: [DISCUSS] Source Watermark Metrics
> I think having runners report important, general properties such as the
> source watermark is great. It is much easier than requiring every source to
> expose it.
>
> I'm not sure how we would require this or do so in a general way. Each
> runner has seperate code for handling the watermark as well as different
> ways information should be reported.
>
> Where would the runner do this? Where would the runner.put these values?
> Maybe this is just part of the documentation about what we would like
> runners to do?
>
> On Fri, Jun 2, 2017, 3:09 AM Aljoscha Krettek  wrote:
>
> > Hi,
> >
> > Thanks for reviving this thread. I think having the watermark is very
>
> > good. Some runners, for example Dataflow and Flink have their own internal
> > metric for the watermark but having it cross-runner seems beneficial (if
> > maybe a bit wasteful).
> >
> > Best,
> > Aljoscha
> >
> > > On 2. Jun 2017, at 03:52, JingsongLee  wrote:
> > >
> > > @Aviem Zur @Ben Chambers What do you think about the value of
> > METRIC_MAX_SPLITS?
> > >
> > >
>
> > --From:JingsongLee
> > Time:2017 May 11 (Thu)
> > 16:37To:dev@beam.apache.org  >Subject:[DISCUSS] Source
> > Watermark Metrics
> > > Hi everyone,
> > >
> > > The source watermark metrics show the consumer latency of Source.
> > > It allows the user to know the health of the job, or it can be used to
> > >  monitor and alarm.
> > > We should have the runner report the watermark metricsrather than
>
> > >  having the source report it using metrics. This addresses the fact that
> > even
> > > if the source has advanced to 8:00, the runner may still know about
> > buffered
>
> > >  elements at 7:00, and so not advance the watermark all the way to 8:00.
> > > The metrics Includes:
> > > 1.Source watermark (`min` amongst all splits):
> > > type = Gauge, namespace = io, name = source_watermark
> > > 2.Source watermark per split:
> > > type = Gauge, namespace = io.splits, name = .source_watermark
> > >
> > > Min Source watermark amongst all splits seems difficult to implement
> > since
> > > some runners(like FlinkRunner) can't access to all the splits to
> > aggregate
> > > and there is no such AggregatorMetric.
> > >
> > > So We could report watermark per split and users could use a `min`
>
> > > aggregation on this in their metrics backends. However, as was mentioned
>
> > > in the IO metrics proposal by several people this could be problematic in
> > > sources with many splits.
> > >
> > > So we do a check when report metrics to solve the problem of too many
> > splits.
> > > {code}
> > > if (splitsNum <= METRIC_MAX_SPLITS) {
> > >   // set the sourceWatermarkOfSplit
> > > }
> > > {code}
> > >
> > > So I'd like to take a discussion to the implement of source watermark
> > metrics
> > >  and specific how many splits is too many. (the value of
> > METRIC_MAX_SPLITS)
> > >
> > > JIRA:
> > > IO metrics (https://issues.apache.org/jira/browse/BEAM-1919)
> > > Source watermark (https://issues.apache.org/jira/browse/BEAM-1941)
> > >
> >
> >
>
>


Re: [DISCUSS] Source Watermark Metrics

2017-06-02 Thread Ben Chambers
I think having runners report important, general properties such as the
source watermark is great. It is much easier than requiring every source to
expose it.

I'm not sure how we would require this or do so in a general way. Each
runner has seperate code for handling the watermark as well as different
ways information should be reported.

Where would the runner do this? Where would the runner.put these values?
Maybe this is just part of the documentation about what we would like
runners to do?

On Fri, Jun 2, 2017, 3:09 AM Aljoscha Krettek  wrote:

> Hi,
>
> Thanks for reviving this thread. I think having the watermark is very
> good. Some runners, for example Dataflow and Flink have their own internal
> metric for the watermark but having it cross-runner seems beneficial (if
> maybe a bit wasteful).
>
> Best,
> Aljoscha
>
> > On 2. Jun 2017, at 03:52, JingsongLee  wrote:
> >
> > @Aviem Zur @Ben Chambers What do you think about the value of
> METRIC_MAX_SPLITS?
> >
> >
> --From:JingsongLee
> Time:2017 May 11 (Thu)
> 16:37To:dev@beam.apache.org Subject:[DISCUSS] Source
> Watermark Metrics
> > Hi everyone,
> >
> > The source watermark metrics show the consumer latency of Source.
> > It allows the user to know the health of the job, or it can be used to
> >  monitor and alarm.
> > We should have the runner report the watermark metricsrather than
> >  having the source report it using metrics. This addresses the fact that
> even
> > if the source has advanced to 8:00, the runner may still know about
> buffered
> >  elements at 7:00, and so not advance the watermark all the way to 8:00.
> > The metrics Includes:
> > 1.Source watermark (`min` amongst all splits):
> > type = Gauge, namespace = io, name = source_watermark
> > 2.Source watermark per split:
> > type = Gauge, namespace = io.splits, name = .source_watermark
> >
> > Min Source watermark amongst all splits seems difficult to implement
> since
> > some runners(like FlinkRunner) can't access to all the splits to
> aggregate
> > and there is no such AggregatorMetric.
> >
> > So We could report watermark per split and users could use a `min`
> > aggregation on this in their metrics backends. However, as was mentioned
> > in the IO metrics proposal by several people this could be problematic in
> > sources with many splits.
> >
> > So we do a check when report metrics to solve the problem of too many
> splits.
> > {code}
> > if (splitsNum <= METRIC_MAX_SPLITS) {
> >   // set the sourceWatermarkOfSplit
> > }
> > {code}
> >
> > So I'd like to take a discussion to the implement of source watermark
> metrics
> >  and specific how many splits is too many. (the value of
> METRIC_MAX_SPLITS)
> >
> > JIRA:
> > IO metrics (https://issues.apache.org/jira/browse/BEAM-1919)
> > Source watermark (https://issues.apache.org/jira/browse/BEAM-1941)
> >
>
>


Re: Behavior of Top.Largest

2017-05-14 Thread Ben Chambers
Exposing the CombineFn is necessary for use with composed combine or
combining value state. There may be other reasons we made this visible, but
these continue to justify it.

On Sun, May 14, 2017, 1:00 PM Reuven Lax  wrote:

> I believe the reason why this is called Top.largest, is that originally it
> was simply the comparator used by Top.largest - i.e. and implementation
> detail. At some point it was made public and used by other transforms -
> maybe making an implementation detail a public class was the real mistake?
>
> On Sun, May 14, 2017 at 11:45 AM, Davor Bonaci  wrote:
>
> > I agree this is an unfortunate name.
> >
> > Tangential: can we rename APIs now that the first stable release is
> nearly
> > done?
> > Of course -- the "rename" can be done by introducing a new API, and
> > deprecating, but not removing, the old one. Then, once we decide to move
> to
> > the next major release, the deprecated API can be removed.
> >
> > I think we should probably do the "rename" at some point, but I'd leave
> the
> > final call to the wider consensus.
> >
> > On Sat, May 13, 2017 at 5:16 PM, Wesley Tanaka  >
> > wrote:
> >
> > > Using Top.Largest to sort a list of {2,1,3} produces {1,2,3}.  This
> > > matches the javadoc for the class, but seems counter-intuitive -- one
> > might
> > > expect that a Comparator called Largest would give largest items first.
> > > I'm wondering if renaming the classes to Natural / Reversed would
> better
> > > match their behavior?
> > >
> > > ---
> > > Wesley Tanaka
> > > https://wtanaka.com/
> >
>


Re: How I hit a roadblock with AutoValue and AvroCoder

2017-04-05 Thread Ben Chambers
Correction autovalue coder.

On Wed, Apr 5, 2017, 2:24 PM Ben Chambers  wrote:

> Serializable coder had a separate set of issues - often larger and less
> efficient. Ideally, we would have an avrocoder.
>
> On Wed, Apr 5, 2017, 2:15 PM Pablo Estrada 
> wrote:
>
> As a note, it seems that SerializableCoder does the trick in this case, as
> it does not require a no-arg constructor for the class that is being
> deserialized - so perhaps we should encourage people to use that in the
> future.
> Best
> -P.
>
> On Wed, Apr 5, 2017 at 1:48 PM Pablo Estrada  wrote:
>
> > Hi all,
> > I was encouraged to write about my troubles to use PCollections of
> > AutoValue classes with AvroCoder; because it seems like currently, this
> is
> > not possible.
> >
> > As part of the changes to PAssert, I meant to create a SuccessOrFailure
> > class that could be passed in a PCollection to a `concludeTransform`,
> which
> > would be in charge of validating that all the assertions succeeded, and
> use
> > AvroCoder for serialization of that class. Consider this dummy example:
> >
> > @AutoValue
> > abstract class FizzBuzz {
> > ...
> > }
> >
> > class FizzBuzzDoFn extends DoFn {
> > ...
> > }
> >
> > 1. The first problem was that the abstract class does not have any
> > attributes, so AvroCoder can not scrape them. For this, (with advice from
> > Kenn Knowles), the Coder would need to take the AutoValue-generated
> class:
> >
> > .apply(ParDo.of(new FizzBuzzDoFn()))
> > .setCoder(AvroCoder.of((Class) AutoValue_FizzBuzz.class))
> >
> > 2. This errored out saying that FizzBuzz and AutoValue_FizzBuzz are
> > incompatible classes, so I just tried bypassing the type system like so:
> >
> > .setCoder(AvroCoder.of((Class) AutoValue_FizzBuzz.class))
> >
> > 3. This compiled properly, and encoding worked, but the problem came at
> > decoding, because Avro specifically requires the class to have a no-arg
> > constructor [1], and AutoValue-generated classes do not come with one.
> This
> > is a problem for several serialization frameworks, and we're not the
> first
> > ones to hit this [2], and the AutoValue people don't seem keen on adding
> > this.
> >
> > Considering all that, it seems that the AutoValue-AvroCoder pair can not
> > currently work. We'd need a serialization framework that does not depend
> on
> > calling the no-arg constructor and then filling in the attributes with
> > reflection. I'm trying to check if SerializableCoder has different
> > deserialization techniques; but for PAssert, I just decided to use
> > POJO+AvroCoder.
> >
> > I hope my experience may be useful to others, and maybe start a
> discussion
> > on how to enable users to have AutoValue classes in their PCollections.
> >
> > Best
> > -P.
> >
> > [1] -
> >
> http://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/reflect/package-summary.html?is-external=true
> > [2] - https://github.com/google/auto/issues/122
> >
> >
>
>


Re: How I hit a roadblock with AutoValue and AvroCoder

2017-04-05 Thread Ben Chambers
Serializable coder had a separate set of issues - often larger and less
efficient. Ideally, we would have an avrocoder.

On Wed, Apr 5, 2017, 2:15 PM Pablo Estrada 
wrote:

> As a note, it seems that SerializableCoder does the trick in this case, as
> it does not require a no-arg constructor for the class that is being
> deserialized - so perhaps we should encourage people to use that in the
> future.
> Best
> -P.
>
> On Wed, Apr 5, 2017 at 1:48 PM Pablo Estrada  wrote:
>
> > Hi all,
> > I was encouraged to write about my troubles to use PCollections of
> > AutoValue classes with AvroCoder; because it seems like currently, this
> is
> > not possible.
> >
> > As part of the changes to PAssert, I meant to create a SuccessOrFailure
> > class that could be passed in a PCollection to a `concludeTransform`,
> which
> > would be in charge of validating that all the assertions succeeded, and
> use
> > AvroCoder for serialization of that class. Consider this dummy example:
> >
> > @AutoValue
> > abstract class FizzBuzz {
> > ...
> > }
> >
> > class FizzBuzzDoFn extends DoFn {
> > ...
> > }
> >
> > 1. The first problem was that the abstract class does not have any
> > attributes, so AvroCoder can not scrape them. For this, (with advice from
> > Kenn Knowles), the Coder would need to take the AutoValue-generated
> class:
> >
> > .apply(ParDo.of(new FizzBuzzDoFn()))
> > .setCoder(AvroCoder.of((Class) AutoValue_FizzBuzz.class))
> >
> > 2. This errored out saying that FizzBuzz and AutoValue_FizzBuzz are
> > incompatible classes, so I just tried bypassing the type system like so:
> >
> > .setCoder(AvroCoder.of((Class) AutoValue_FizzBuzz.class))
> >
> > 3. This compiled properly, and encoding worked, but the problem came at
> > decoding, because Avro specifically requires the class to have a no-arg
> > constructor [1], and AutoValue-generated classes do not come with one.
> This
> > is a problem for several serialization frameworks, and we're not the
> first
> > ones to hit this [2], and the AutoValue people don't seem keen on adding
> > this.
> >
> > Considering all that, it seems that the AutoValue-AvroCoder pair can not
> > currently work. We'd need a serialization framework that does not depend
> on
> > calling the no-arg constructor and then filling in the attributes with
> > reflection. I'm trying to check if SerializableCoder has different
> > deserialization techniques; but for PAssert, I just decided to use
> > POJO+AvroCoder.
> >
> > I hope my experience may be useful to others, and maybe start a
> discussion
> > on how to enable users to have AutoValue classes in their PCollections.
> >
> > Best
> > -P.
> >
> > [1] -
> >
> http://avro.apache.org/docs/1.7.7/api/java/org/apache/avro/reflect/package-summary.html?is-external=true
> > [2] - https://github.com/google/auto/issues/122
> >
> >
>


Re: Proposal on porting PAssert away from aggregators

2017-03-30 Thread Ben Chambers
When you say instance do you mean a different named PTransform class? Or
just a separate instance of whatever "HandleAssertResult" PTransform is
introduced? If the latter, I believe that makes sense. It also addresses
the problems with committed vs. attempted metrics. Specifically:

Since we want to know that each assertion has run, we'd like to do
something like "Number of successful assertions" = N. But, in the presence
of retries, the number of successful assertions may be over counted. But,
if we have a separate transform for each assertion, then we can say "for
each transform, the number of successful assertions should be >= 1", which
ensures that every assert has succeeded. This will work with any metric
implementation that may overcount but never (or sufficiently rarely)
undercount.

On Thu, Mar 30, 2017 at 3:39 PM Kenneth Knowles 
wrote:

> This sounds pretty good to me. I really like emphasizing the ability of a
> runner to go ahead and use pipeline surgery to put in their own
> verification transform.
>
> On Thu, Mar 30, 2017 at 3:12 PM, Pablo Estrada  >
> wrote:
>
> > * The current asserting DoFns in PAssert will go from being DoFn > Void> to be DoFn, and pass the output
> > SuccessOrFailure objects downstream to the checking
> > PTransform.
> > * This default PTransform will use metrics to
> count
> > successes, and fail immediately on failures (after incrementing the
> failure
> > counter).
> >
>
> Could it possibly be useful to have a separate instance of a
> PTransform for each assertion? It might be able to proxy
> for counting, for runners that don't yet have even rudimentary metrics. The
> transform could set up triggering to fire just once and fail if it has not
> received one Success. The runner might still just not execute that
> transform, but we could set it up so that is extremely unlikely, and anyhow
> it is better than no coverage. I could be missing something about the
> implementation issues here, though.
>
> Kenn
>


Re: [PROPOSAL] "Requires deterministic input"

2017-03-21 Thread Ben Chambers
Allowing an annotation on DoFn's that produce deterministic output could be
added in the future, but doesn't seem like a great option.

1. It is a correctness issue to assume a DoFn is deterministic and be
wrong, so we would need to assume all transform outputs are
non-deterministic unless annotated. Getting this correct is difficult (for
example, GBK is surprisingly non-deterministic except in specific cases).

2. It is unlikely to be a major performance improvement, given that any
non-deterministic transform prior to a sink (which are most likely to
require deterministic input) will cause additional work to be needed.

Based on this, it seems like the risk of allowing an annotation is high
while the potential for performance improvements is low. The current
proposal (not allowing an annotation) makes sense for now, until we can
demonstrate that the impact on performance is high in cases that could be
avoided with an annotation (in real-world use).

-- Ben

On Tue, Mar 21, 2017 at 2:05 PM vikas rk  wrote:

+1 for the general idea of runners handling it over hard-coded
implementation strategy.

For the Write transform I believe you are talking about ApplyShardingKey
<
https://github.com/apache/beam/blob/d66029cafde152c0a46ebd276ddfa4c3e7fd3433/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java#L304
>
which
introduces non deterministic behavior when retried?


*Let a DoFn declare (mechanism not important right now) that it
"requiresdeterministic input"*



*Each runner will need a way to induce deterministic input - the
obviouschoice being a materialization.*

Does this mean that a runner will always materialize (or whatever the
strategy is) an input PCollection to this DoFn even though the PCollection
might have been produced by deterministic transforms? Would it make sense
to also let DoFns declare if they produce non-deterministic output?

-Vikas


On 21 March 2017 at 13:52, Stephen Sisk  wrote:

> Hey Kenn-
>
> this seems important, but I don't have all the context on what the problem
> is.
>
> Can you explain this sentence "Specifically, there is pseudorandom data
> generated and once it has been observed and used to produce a side effect,
> it cannot be regenerated without erroneous results." ?
>
> Where is the pseudorandom data coming from? Perhaps a concrete example
> would help?
>
> S
>
>
> On Tue, Mar 21, 2017 at 1:22 PM Kenneth Knowles 
> wrote:
>
> > Problem:
> >
> > I will drop all nuance and say that the `Write` transform as it exists
in
> > the SDK is incorrect until we add some specification and APIs. We can't
> > keep shipping an SDK with an unsafe transform in it, and IMO this
> certainly
> > blocks a stable release.
> >
> > Specifically, there is pseudorandom data generated and once it has been
> > observed and used to produce a side effect, it cannot be regenerated
> > without erroneous results.
> >
> > This generalizes: For some side-effecting user-defined functions, it is
> > vital that even across retries/replays they have a consistent view of
the
> > contents of their input PCollection, because their effect on the outside
> > world cannot be retracted if/when they fail and are retried. Once the
> > runner ensures a consistent view of the input, it is then their own
> > responsibility to be idempotent.
> >
> > Ideally we should specify this requirement for the user-defined function
> > without imposing any particular implementation strategy on Beam runners.
> >
> > Proposal:
> >
> > 1. Let a DoFn declare (mechanism not important right now) that it
> "requires
> > deterministic input".
> >
> > 2. Each runner will need a way to induce deterministic input - the
> obvious
> > choice being a materialization.
> >
> > I want to keep the discussion focused, so I'm leaving out any
> possibilities
> > of taking this further.
> >
> > Regarding performance: Today places that require this tend to be already
> > paying the cost via GroupByKey / Reshuffle operations, since that was a
> > simple way to induce determinism in batch Dataflow* (doesn't work for
> most
> > other runners nor for streaming Dataflow). This change will replace a
> > hard-coded implementation strategy with a requirement that may be
> fulfilled
> > in the most efficient way available.
> >
> > Thoughts?
> >
> > Kenn (w/ lots of consult from colleagues, especially Ben)
> >
> > * There is some overlap with the reshuffle/redistribute discussion
> because
> > of this historical situation, but I would like to leave that broader
> > discussion out of this correctness issue.
> >
>


Re: [DISCUSS] Per-Key Watermark Maintenance

2017-02-28 Thread Ben Chambers
Following up on what Kenn said, it seems like the idea of a per-key
watermark is logically consistent with Beam model. Each runner already
chooses how to approximate the model-level concept of the per-key watermark.

The list Kenn had could be extended with things ilke:
4. A runner that assumes the watermark for all keys is at -\infty until all
data is processed and then jumps to \infty. This wolud be similar to a
Batch runner, for instance.

How watermarks are tracked may even depend on the pipeline -- if there is
no shuffling of data between keys it may be easier to support a per-key
watermark.

On Tue, Feb 28, 2017 at 1:50 PM Kenneth Knowles 
wrote:

> This is a really interesting topic. I think Beam is a good place to have a
> broader cross-runner discussion of this.
>
> I know that you are not the only person skeptical due to
> trickiness/costliness of implementation.
>
> On the other hand, at a semantic level, I think any correct definition will
> allow a per-PCollection watermark to serve as just a "very bad" per-key
> watermark. So I would target a definition whereby any of these is
> acceptable:
>
> 1. a runner with no awareness of per-key watermarks
> 2. a runner that chooses only sometimes to use them (there might be some
> user-facing cost/latency tradeoff a la triggers here)
> 3. a runner that chooses a coarse approximation for tracking key lineage
>
> Can you come up with an example where this is impossible?
>
> Developing advanced model features whether or not all runners do (or can)
> support them is exactly the spirit of Beam, to me, so I am really glad you
> brought this up here.
>
> On Mon, Feb 27, 2017 at 5:59 AM, Aljoscha Krettek 
> wrote:
>
> > We recently started a discussion on the Flink ML about this topic: [1]
> >
> > The gist of it is that for some use cases tracking the watermark per-key
> > instead of globally (or rather per partition) can be useful for some
> cases.
> > Think, for example, of tracking some user data off mobile phones where
> the
> > user-id/phone number is the key. For these cases, one slow key, i.e. a
> > disconnected phone, would slow down the watermark.
> >
> > I'm not saying that this is a good idea, I'm actually skeptical because
> the
> > implementation seems quite complicated/costly to me and I have some
> doubts
> > about being able to track the watermark per-key in the sources. It's just
> > that more people seem to be asking about this lately and I would like to
> > have a discussion with the clever Beam people because we claim to be at
> the
> > forefront of parallel data processing APIs. :-)
> >
> > Also note that I'm aware that in the example given above it would be
> > difficult to find out if one key is slow in the first place and therefore
> > hold the watermark for that key.
> >
> > If you're interested, please have a look at the discussion on the Flink
> ML
> > that I linked to above. It's only 4 mails so far and Jamie gives a nicer
> > explanation of the possible use cases than I'm doing here. Note, that my
> > discussion of feasibility and APIs/key lineage should also apply to Beam.
> >
> > What do you think?
> >
> > [1]
> > https://lists.apache.org/thread.html/2b90d5b1d5e2654212cfbbcc6510ef
> > 424bbafc4fadb164bd5aff9216@%3Cdev.flink.apache.org%3E
> >
>


Re: Metrics for Beam IOs.

2017-02-18 Thread Ben Chambers
The question is how much of metrics has to be in the runner and how much
can be shared. So far we share the API the user uses to report metrics -
this is the most important part since it is required for pipelines to be
portable.

The next piece that could be shared is something related to reporting. But,
implementing logically correct metrics requires the execution engine to be
involved, since it depends on how and which bundles are retried. What I'm
not sure about is how much can be shared and/or made available for these
runners vs. how much is tied to the execution engine.

On Sat, Feb 18, 2017, 8:52 AM Jean-Baptiste Onofré  wrote:

> For Spark, I fully agree. My point is more when the execution engine or
> runner doesn't provide anything or we have to provide a generic way of
> harvesting/pushing metrics.
>
> It could at least be a documentation point. Actually, I'm evaluation the
> monitoring capabilities of the different runners.
>
> Regards
> JB
>
> On 02/18/2017 05:47 PM, Amit Sela wrote:
> > That's what I don't understand - why would we want that ?
> > Taking on responsibilities in the "stack" should have a good reason.
> >
> > Someone choosing to run Beam on Spark/Flink/Apex would have to take care
> of
> > installing those clusters, right ? perhaps providing them a resilient
> > underlying FS ? and if he wants, setup monitoring (which even with the
> API
> > proposed he'd have to do).
> >
> > I just don't see why it should be a part of the runner and/or Metrics
> API.
> >
> > On Sat, Feb 18, 2017 at 6:35 PM Jean-Baptiste Onofré 
> > wrote:
> >
> >> Good point.
> >>
> >> In Decanter, it's what I named a "scheduled collector". So, yes, the
> >> adapter will periodically harvest metric to push.
> >>
> >> Regards
> >> JB
> >>
> >> On 02/18/2017 05:30 PM, Amit Sela wrote:
> >>> First issue with "push" metrics plugin - what if the runner's
> underlying
> >>> reporting mechanism is "pull" ? Codahale ScheduledReporter will sample
> >> the
> >>> values every X and send to ...
> >>> So any runner using a "pull-like" would use an adapter ?
> >>>
> >>> On Sat, Feb 18, 2017 at 6:27 PM Jean-Baptiste Onofré 
> >>> wrote:
> >>>
> >>>> Hi Ben,
> >>>>
> >>>> ok it's what I thought. Thanks for the clarification.
> >>>>
> >>>> +1 for the plugin-like "push" API (it's what I have in mind too ;)).
> >>>> I will start a PoC for discussion next week.
> >>>>
> >>>> Regards
> >>>> JB
> >>>>
> >>>> On 02/18/2017 05:17 PM, Ben Chambers wrote:
> >>>>> The runner can already report metrics during pipeline execution so it
> >> is
> >>>>> usable for monitoring.
> >>>>>
> >>>>> The pipeline result can be used to query metrics during pipeline
> >>>> execution,
> >>>>> so a first version of reporting to other systems is to periodically
> >> pulls
> >>>>> metrics from the runner with that API.
> >>>>>
> >>>>> We may eventually want to provide a plugin-like API to get the runner
> >> to
> >>>>> push metrics more directly to other metrics stores. This layer needs
> >> some
> >>>>> thought since it has to handle the complexity of attempted/committed
> >>>>> metrics to be consistent with the model.
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sat, Feb 18, 2017, 5:44 AM Jean-Baptiste Onofré 
> >>>> wrote:
> >>>>>
> >>>>> Hi Amit,
> >>>>>
> >>>>> before Beam, I didn't mind about portability ;) So I used the Spark
> >>>>> approach.
> >>>>>
> >>>>> But, now, as a Beam user, I would expect a generic way to deal with
> >>>>> metric whatever the runner would be.
> >>>>>
> >>>>> Today, you are right: I'm using the solution provided by the
> execution
> >>>>> engine. That's the current approach and it works fine. And it's up to
> >> me
> >>>>> to leverage (for intance Accumulators) it with my own system.
> >>>>>
> >>>>> My thought is more to provide a generic way. I

Re: Metrics for Beam IOs.

2017-02-18 Thread Ben Chambers
The runner can already report metrics during pipeline execution so it is
usable for monitoring.

The pipeline result can be used to query metrics during pipeline execution,
so a first version of reporting to other systems is to periodically pulls
metrics from the runner with that API.

We may eventually want to provide a plugin-like API to get the runner to
push metrics more directly to other metrics stores. This layer needs some
thought since it has to handle the complexity of attempted/committed
metrics to be consistent with the model.



On Sat, Feb 18, 2017, 5:44 AM Jean-Baptiste Onofré  wrote:

Hi Amit,

before Beam, I didn't mind about portability ;) So I used the Spark
approach.

But, now, as a Beam user, I would expect a generic way to deal with
metric whatever the runner would be.

Today, you are right: I'm using the solution provided by the execution
engine. That's the current approach and it works fine. And it's up to me
to leverage (for intance Accumulators) it with my own system.

My thought is more to provide a generic way. It's only a discussion for
now ;)

Regards
JB

On 02/18/2017 02:38 PM, Amit Sela wrote:
> On Sat, Feb 18, 2017 at 10:16 AM Jean-Baptiste Onofré 
> wrote:
>
>> Hi Amit,
>>
>> my point is: how do we provide metric today to end user and how can they
>> use it to monitor a running pipeline ?
>>
>> Clearly the runner is involved, but, it should behave the same way for
>> all runners. Let me take an example.
>> On my ecosystem, I'm using both Flink and Spark with Beam, some
>> pipelines on each. I would like to get the metrics for all pipelines to
>> my monitoring backend. If I can "poll" from the execution engine metric
>> backend to my system that's acceptable, but it's an overhead of work.
>> Having a generic metric reporting layer would allow us to have a more
>> common way. If the user doesn't provide any reporting sink, then we use
>> the execution backend metric layer. If provided, we use the reporting
sink.
>>
> How did you do it before Beam ? I that for Spark you reported it's native
> metrics via Codahale Reporter and Accumulators were visible in the UI, and
> the Spark runner took it a step forward to make it all visible via
> Codahale. Assuming Flink does something similar, it all belongs to runner
> setup/configuration.
>
>>
>> About your question: you are right, it's possible to update a collector
>> or appender without impacting anything else.
>>
>> Regards
>> JB
>>
>> On 02/17/2017 10:38 PM, Amit Sela wrote:
>>> @JB I think what you're suggesting is that Beam should provide a
"Metrics
>>> Reporting" API as well, and I used to think like you, but the more I
>>> thought of that the more I tend to disagree now.
>>>
>>> The SDK is for users to author pipelines, so Metrics are for
user-defined
>>> metrics (in contrast to runner metrics).
>>>
>>> The Runner API is supposed to help different backends to integrate with
>>> Beam to allow users to execute those pipeline on their favourite
>> backend. I
>>> believe the Runner API has to provide restrictions/demands that are just
>>> enough so a runner could execute a Beam pipeline as best it can, and I'm
>>> afraid that this would demand runner authors to do work that is
>> unnecessary.
>>> This is also sort of "crossing the line" into the runner's domain and
>>> "telling it how to do" instead of what, and I don't think we want that.
>>>
>>> I do believe however that runner's should integrate the Metrics into
>> their
>>> own metrics reporting system - but that's for the runner author to
>> decide.
>>> Stas did this for the Spark runner because Spark doesn't report back
>>> user-defined Accumulators (Spark's Aggregators) to it's Metrics system.
>>>
>>> On a curious note though, did you use an OSGi service per event-type ?
so
>>> you can upgrade specific event-handlers without taking down the entire
>>> reporter ? but that's really unrelated to this thread :-) .
>>>
>>>
>>>
>>> On Fri, Feb 17, 2017 at 8:36 PM Ben Chambers
>> 
>>> wrote:
>>>
>>>> It don't think it is possible for there to be a general mechanism for
>>>> pushing metrics out during the execution of a pipeline. The Metrics API
>>>> suggests that metrics should be reported as values across all attempts
>> and
>>>> values across only successful attempts. The latter requires runner
>>>> involvement to 

Re: Metrics for Beam IOs.

2017-02-17 Thread Ben Chambers
It don't think it is possible for there to be a general mechanism for
pushing metrics out during the execution of a pipeline. The Metrics API
suggests that metrics should be reported as values across all attempts and
values across only successful attempts. The latter requires runner
involvement to ensure that a given metric value is atomically incremented
(or checkpointed) when the bundle it was reported in is committed.

Aviem has already implemented Metrics support for the Spark runner. I am
working on support for the Dataflow runner.

On Fri, Feb 17, 2017 at 7:50 AM Jean-Baptiste Onofré 
wrote:

Hi guys,

As I'm back from vacation, I'm back on this topic ;)

It's a great discussion, and I think about the Metric IO coverage, it's
good.

However, there's a point that we discussed very fast in the thread and I
think it's an important one (maybe more important than the provided
metrics actually in term of roadmap ;)).

Assuming we have pipelines, PTransforms, IOs, ... using the Metric API,
how do we expose the metrics for the end-users ?

A first approach would be to bind a JMX MBean server by the pipeline and
expose the metrics via MBeans. I don't think it's a good idea for the
following reasons:
1. It's not easy to know where the pipeline is actually executed, and
so, not easy to find the MBean server URI.
2. For the same reason, we can have port binding error.
3. If it could work for unbounded/streaming pipelines (as they are
always "running"), it's not really applicable for bounded/batch
pipelines as their lifetime is "limited" ;)

So, I think the "push" approach is better: during the execution, a
pipeline "internally" collects and pushes the metric to a backend.
The "push" could a kind of sink. For instance, the metric "records" can
be sent to a Kafka topic, or directly to Elasticsearch or whatever.
The metric backend can deal with alerting, reporting, etc.

Basically, we have to define two things:
1. The "appender" where the metrics have to be sent (and the
corresponding configuration to connect, like Kafka or Elasticsearch
location)
2. The format of the metric data (for instance, json format).

In Apache Karaf, I created something similar named Decanter:

http://blog.nanthrax.net/2015/07/monitoring-and-alerting-with-apache-karaf-decanter/

http://karaf.apache.org/manual/decanter/latest-1/

Decanter provides collectors that harvest the metrics (like JMX MBean
attributes, log messages, ...). Basically, for Beam, it would be
directly the Metric API used by pipeline parts.
Then, the metric record are send to a dispatcher which send the metric
records to an appender. The appenders store or send the metric records
to a backend (elasticsearc, cassandra, kafka, jms, reddis, ...).

I think it would make sense to provide the configuration and Metric
"appender" via the pipeline options.
As it's not really runner specific, it could be part of the metric API
(or SPI in that case).

WDYT ?

Regards
JB

On 02/15/2017 09:22 AM, Stas Levin wrote:
> +1 to making the IO metrics (e.g. producers, consumers) available as part
> of the Beam pipeline metrics tree for debugging and visibility.
>
> As it has already been mentioned, many IO clients have a metrics mechanism
> in place, so in these cases I think it could be beneficial to mirror their
> metrics under the relevant subtree of the Beam metrics tree.
>
> On Wed, Feb 15, 2017 at 12:04 AM Amit Sela  wrote:
>
>> I think this is a great discussion and I'd like to relate to some of the
>> points raised here, and raise some of my own.
>>
>> First of all I think we should be careful here not to cross boundaries.
IOs
>> naturally have many metrics, and Beam should avoid "taking over" those.
IO
>> metrics should focus on what's relevant to the Pipeline: input/output
rate,
>> backlog (for UnboundedSources, which exists in bytes but for monitoring
>> purposes we might want to consider #messages).
>>
>> I don't agree that we should not invest in doing this in Sources/Sinks
and
>> going directly to SplittableDoFn because the IO API is familiar and
known,
>> and as long as we keep it should be treated as a first class citizen.
>>
>> As for enable/disable - if IOs consider focusing on pipeline-related
>> metrics I think we should be fine, though this could also change between
>> runners as well.
>>
>> Finally, considering "split-metrics" is interesting because on one hand
it
>> affects the pipeline directly (unbalanced partitions in Kafka that may
>> cause backlog) but this is that fine-line of responsibilities (Kafka
>> monitoring would probably be able to tell you that partitions are not
>> balanced).
>>
>> My 2 cents, cheers!
>>

Re: Metrics for Beam IOs.

2017-02-14 Thread Ben Chambers
On Tue, Feb 14, 2017 at 9:07 AM Stephen Sisk 
wrote:

> hi!
>
> (ben just sent his mail and he covered some similar topics to me, but I'll
> keep my comments intact since they are slightly different)
>
> * I think there are a lot of metrics that should be exposed for all
> transforms - everything from JB's list (mile number of split, throughput,
> reading/writing rate, number of splits, etc..) also apply to
> splittableDoFns.
>

Many of the metrics that should be exposed for all transforms are likely
best exposed by the runner or some other common layer, rather than being
added to each transform. But things like number of elements, estimated size
of elements, etc. all make sense for every transform.


> * I also think there are data source specific metrics that a given IO will
> want to expose (ie, things like kafka backlog for a topic.) No one on this
> thread has specifically addressed this, but Beam Sources & Sinks do not
> presently have the ability to report metrics even if a given IO writer
> wanted to - depending on the timeline for SplittableDoFn and the move to
> that infrastructure, I don't think we need that support in Sources/Sinks,
> but I do think we should make sure SplittableDoFn has the necessary
> support.
>

Two parts -- we may want to introduce something like a Gauge here that lets
the metric system ask the source/sink for the latest metrics. This allows
the runner to gather metrics at a rate that makes sense without impacting
performance.

But, the existing Metrics API should work within a source or sink --
anything that is called within a step should work.


> * I think there are ways to do many metrics such that they are not too
> expensive to calculate all the time.  (ie, reporting per bundle rather than
> per item) I think we should ask whether we want/need are metrics that are
> expensive to calculate before going to the effort of adding enable/disable.
>

+1 -- hence why I'd like to look at reporting the metrics with no
configuration.


> * I disagree with ben about showing the amount of splitting - I think
> especially with IOs it's useful to understand/diagnose reading problems
> since that's one potential source of problems, especially given that the
> user can write transforms that split now in SplittableDoFn. But I look
> forward to discussing that further
>

I think many of the splitting metrics fall into things the runner should
report. I think if we pick the right so they're useful, it likely doesn't
hurt to gather them, but here again it may be useful to talk about specific
problems.

I still think these likely won't make sense for all users -- if I'm a new
user just trying to get a source/sink working, I'm not sure what "splitting
metrics" would be useful to me. But if the source can detect that it is
having trouble splitting and raise a message like "you're using compressed
text files which can't be parallelized beyond the number of files" that is
much more actionable.


> +1 on talking about specific examples
>
> S
>
> On Tue, Feb 14, 2017 at 8:29 AM Jean-Baptiste Onofré 
> wrote:
>
> > Hi Aviem
> >
> > Agree with your comments, it's pretty close to my previous ones.
> >
> > Regards
> > JB
> >
> > On Feb 14, 2017, 12:04, at 12:04, Aviem Zur  wrote:
> > >Hi Ismaël,
> > >
> > >You've raised some great points.
> > >Please see my comments inline.
> > >
> > >On Tue, Feb 14, 2017 at 3:37 PM Ismaël Mejía  wrote:
> > >
> > >> ​Hello,
> > >>
> > >> The new metrics API allows us to integrate some basic metrics into
> > >the Beam
> > >> IOs. I have been following some discussions about this on JIRAs/PRs,
> > >and I
> > >> think it is important to discuss the subject here so we can have more
> > >> awareness and obtain ideas from the community.
> > >>
> > >> First I want to thank Ben for his work on the metrics API, and Aviem
> > >for
> > >> his ongoing work on metrics for IOs, e.g. KafkaIO) that made me aware
> > >of
> > >> this subject.
> > >>
> > >> There are some basic ideas to discuss e.g.
> > >>
> > >> - What are the responsibilities of Beam IOs in terms of Metrics
> > >> (considering the fact that the actual IOs, server + client, usually
> > >provide
> > >> their own)?
> > >>
> > >
> > >While it is true that many IOs provide their own metrics, I think that
> > >Beam
> > >should expose IO metrics because:
> > >
> > >1. Metrics which help understanding performance of a pipeline which
> > >uses
> > >   an IO may not be covered by the IO .
> > >2. Users may not be able to setup integrations with the IO's metrics to
> > >view them effectively (And correlate them to a specific Beam pipeline),
> > >but
> > >   still want to investigate their pipeline's performance.
> > >
> > >
> > >> - What metrics are relevant to the pipeline (or some particular IOs)?
> > >Kafka
> > >> backlog for one could point that a pipeline is behind ingestion rate.
> > >
> > >
> > >I think it depends on the IO, but there is probably overlap in some of
> > >the
> > >metrics so a guideline might be written f

Re: Metrics for Beam IOs.

2017-02-14 Thread Ben Chambers
Thanks for starting this conversation Ismael! I too have been thinking
we'll need some general approach to metrics for IO in the near future.

Two general thoughts:

1. Before making the metrics configurable, I think it would be worthwhile
to see if we can find the right set of metrics that provide useful
information about IO without affecting performance and have these always
on. Monitoring information like this is often useful when a pipeline is
behaving unexpectedly, and predicting when that will happen and turning on
the metrics is problematic.

2. I think focusing on metrics about source splitting and such is the wrong
level from a user perspective. A user shouldn't need to understand how
sources split and what that means. Instead, we should report higher-level
metrics such as how many bytes of input have been processed, how many bytes
remain (if that is known), etc.

Ideally, metrics about splitting can be reported by the runner in a general
manner. If they're useful for developing the source maybe that would be the
configuration (indicating that you're developing a source and want these
more detailed metrics).

Maybe it would help to pick one or two IOs that you're looking at and talk
about proposed metrics? That might focus the discussion on what metrics
make sense to users and how expensive they might be to report?

On Tue, Feb 14, 2017 at 8:29 AM Jean-Baptiste Onofré 
wrote:

> Hi Aviem
>
> Agree with your comments, it's pretty close to my previous ones.
>
> Regards
> JB
>
> On Feb 14, 2017, 12:04, at 12:04, Aviem Zur  wrote:
> >Hi Ismaël,
> >
> >You've raised some great points.
> >Please see my comments inline.
> >
> >On Tue, Feb 14, 2017 at 3:37 PM Ismaël Mejía  wrote:
> >
> >> ​Hello,
> >>
> >> The new metrics API allows us to integrate some basic metrics into
> >the Beam
> >> IOs. I have been following some discussions about this on JIRAs/PRs,
> >and I
> >> think it is important to discuss the subject here so we can have more
> >> awareness and obtain ideas from the community.
> >>
> >> First I want to thank Ben for his work on the metrics API, and Aviem
> >for
> >> his ongoing work on metrics for IOs, e.g. KafkaIO) that made me aware
> >of
> >> this subject.
> >>
> >> There are some basic ideas to discuss e.g.
> >>
> >> - What are the responsibilities of Beam IOs in terms of Metrics
> >> (considering the fact that the actual IOs, server + client, usually
> >provide
> >> their own)?
> >>
> >
> >While it is true that many IOs provide their own metrics, I think that
> >Beam
> >should expose IO metrics because:
> >
> >1. Metrics which help understanding performance of a pipeline which
> >uses
> >   an IO may not be covered by the IO .
> >2. Users may not be able to setup integrations with the IO's metrics to
> >view them effectively (And correlate them to a specific Beam pipeline),
> >but
> >   still want to investigate their pipeline's performance.
> >
> >
> >> - What metrics are relevant to the pipeline (or some particular IOs)?
> >Kafka
> >> backlog for one could point that a pipeline is behind ingestion rate.
> >
> >
> >I think it depends on the IO, but there is probably overlap in some of
> >the
> >metrics so a guideline might be written for this.
> >I listed what I thought should be reported for KafkaIO in the following
> >JIRA: https://issues.apache.org/jira/browse/BEAM-1398
> >Feel free to add more metrics you think are important to report.
> >
> >
> >>
> >>
> >- Should metrics be calculated on IOs by default or no?
> >> - If metrics are defined by default does it make sense to allow users
> >to
> >> disable them?
> >>
> >
> >IIUC, your concern is that metrics will add overhead to the pipeline,
> >and
> >pipelines which are highly sensitive to this will be hampered?
> >In any case I think that yes, metrics calculation should be
> >configurable
> >(Enable/disable).
> >In Spark runner, for example the Metrics sink feature (not the metrics
> >calculation itself, but sinks to send them to) is configurable in the
> >pipeline options.
> >
> >
> >> Well these are just some questions around the subject so we can
> >create a
> >> common set of practices to include metrics in the IOs and eventually
> >> improve the transform guide with this. What do you think about this?
> >Do you
> >> have other questions/ideas?
> >>
> >> Thanks,
> >> Ismaël
> >>
>


Re: Should you always have a separate PTransform class for a new transform?

2017-02-07 Thread Ben Chambers
Going back to the beginning, why not "Combine.perKey(Count.fn())" or some
such?

We have a lot of boilerplate already to support "Count.perKey()" and that
will only become worse when we try to have a Count.PerKey class that has
the functionality of Combine.PerKey, why not just get rid of the
boilerplate?

Pithy examples aside, there seems to be value in less implementation
complexity and making it clear that Count is just a CombineFn.

On Tue, Feb 7, 2017, 7:49 PM Kenneth Knowles  wrote:

> I am +0.7 on this idea. My rationale is contained in this thread, but I
> thought I would paraphrase it anyhow:
>
>
> "You automatically get all the features of Combine" / "If you add a feature
> to Combine you have to update all wrappers"
>
> 0. I have been in some of the mentioned historical discussions and was
> swayed enough to not bother trying to change things, but I don't actually
> think the abstraction breakage is worth the benefits. I think the status
> quo is hanging on because it is the status quo.
>
> 1. Any Combine wrapper should also be providing its CombineFn, for use with
> state. So the user already has a good way to get the same result without
> the wrapper, e.g. Combine.globally(Mean.of()). There are reasons to provide
> a factory method, such as abstracting away tweaking of defaults, but I
> don't know of any cases of this happening.
>
> 2. Most interesting transforms will not just be Combine wrappers - we have
> a lot of them because we try to write pithy examples so I bet this issue is
> a bigger deal to us than to our users. I don't have data to back this up;
> maybe users do write a bunch of Combine wrappers, but IMO they should spend
> their time elsewhere. I certainly would. So the burden of people updating
> their Combine wrappers is not compelling to me.
>
> 2a. Consider also Count.perElement(). It really is a glorified combine, but
> none of the supposed benefits of automatic propagation of tuning knobs can
> by accrue to it, because it is not _just_ a Combine.
>
> 3. Those transforms that really are just Combine wrappers will not require
> update when there is a new feature added to Combine. A new performance
> tuning knob, perhaps, but as Eugene points out, new features like side
> inputs or access to pipeline options aren't automatically applicable even
> if your transform is a glorified CombineFn.
>
> 3a. And since the argument is only about universally applicable builder
> methods on Combine, how many more do you imagine we will be adding?
>
> 3b. You can use type hackery to ensure you don't forget a knob, along the
> lines of `CountPerElement implements HasACombineInsideIt`.
> This would actually be a step towards enabling e.g. hot key counting in
> Count.PerElement
>
> 4. If a transform that is just a wrapper needs to evolve into more, then
> you have to make a backwards incompatible change because you broke your
> abstractions, and you incur all the manual programming cost anyhow.
>
>
> If I were to write a style guide, I might summarize as:
>
> (a) If you are writing an interesting CombineFn, give it some public
> factory methods.
>
> (b) If you are writing a transform that _by definition, forever and always_
> is a wrapper on Combine, stop after (a)
>
> (c) If you are writing a transform that has conceivably multiple
> implementation choices, you need an abstraction boundary to protect against
> the current decision.
>
> (d) Only return a class more specific than PTransform if your transform has
> more-specific methods to call.
>
>
> All that said, I'm not sure how much influence this particular collection
> of guidelines will have, hence I don't feel a full +1. (except (a) which is
> very important).
>
> Kenn
>
> On Tue, Feb 7, 2017 at 5:59 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > There's 2 points here
> >
> > 1. If Count.globally() is implemented via Combine.globally(), then should
> > Count.globally() return a Combine.Globally, or should it wrap it into a
> new
> > class Count.Globally? (that's what I'm wondering in this thread)
> >
> > I think the "least visibility" argument here would lead us to saying we
> > should wrap into a new class, because returning a Combine.Globally leaks
> > the implementation detail that Count is always implemented via Combine.
> >
> > 2. If Thumbs.twiddle() transforms Foo to Bar, should it return a
> > PTransform, or should it return Thumbs.Twiddle (which extends
> > PTransform)? This is where your "least visibility" argument
> from
> > that thread applies more - we're exposing a more specific type - but I
> > think by exposing this type we are not imposing any additional
> obligations
> > on ourselves and not leaking any details: saying "twiddle() returns a
> > Twiddle" is as good as saying nothing.
> >
> > In case of Combine, the ability to additionally configure it for the user
> > is a strong argument. But I think we've been through a similar situation
> > and made a different decision: bounded reads from unbounded s

Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
Here's an example API that would make this part of a DoFn. The idea here is
that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the
runner (and DoFnRunner) could see that it has asked for batches, so rather
than calling a `processElement` on every input `I`, it assembles a
`Collection` and then calls the method.

Possible API making this part of DoFn (with a fixed size):

public MyBatchedDoFn extends DoFn {
  @ProcessBatch(size = 50)
  public void processBatch(ProcessContext c) {
Collection batchContents = c.element();
...
  }
}

Possible API making this part of DoFn (with dynamic size):

public MyBatchedDoFn extends DoFn {
  @ProcessBatch
  public boolean processBatch(ProcessContext c) {
Collection batchContents = c.element();
if (batchContents.size() < 50) {
  return false; // batch not yet processed
}

...
return true;
  }
}

On Thu, Jan 26, 2017 at 4:16 PM Robert Bradshaw 
wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
>  wrote:
> > I agree that wrapping the DoFn is probably not the way to go, because the
> > DoFn may be quite tricky due to all the reflective features: e.g. how do
> > you automatically "batch" a DoFn that uses state and timers? What about a
> > DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
> > What about future reflective features? The class for invoking DoFn's,
> > DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> > good reason.
> >
> > I'd rather leave the intricacies of invoking DoFn's to runners, and say
> > that you can't wrap DoFn's, period - "adapter", "decorator" and other
> > design patterns just don't apply to DoFn's.
>
> As a simple example, given a DoFn it's perfectly natural to want
> to "wrap" this as a DoFn, KV>. State, side inputs,
> windows, etc. would just be passed through.
>
> The fact that this is complicated, with reflection and flexible
> signatures and byte generation, is a property of the SDK (to provide a
> flexible DoFn API). I agree that it's nice to hide this complexity
> from the user, and it discourages this kind of composability.
>
> I would say that it's nice to let the "batching fn" have side inputs,
> setup/teardown, etc. Pretty much everything the current DoFn has,
> though of course using certain properties (e.g. state and timers, or
> windows) would restrict bundles to be contained within a single
> key/window/whatever.
>
> > The two options for batching are:
> > - A transform that takes elements and produces batches, like Robert said
> > - A simple Beam-agnostic library that takes Java objects and produces
> > batches of Java objects, with an API that makes it convenient to use in a
> > typical batching DoFn
>
> I don't think a Beam-agnostic library could correctly handle details
> like windowing and timestamps.
>
>
> On Thu, Jan 26, 2017 at 3:53 PM, Ben Chambers
>  wrote:
> > The third option for batching:
> >
> > - Functionality within the DoFn and DoFnRunner built as part of the SDK.
> >
> > I haven't thought through Batching, but at least for the
> > IntraBundleParallelization use case this actually does make sense to
> expose
> > as a part of the model. Knowing that a DoFn supports parallelization, a
> > runner may want to control how much parallelization is allowed, and the
> > DoFn also needs to make sure to wait on all those threads (and make sure
> > they're properly setup for logging/metrics/etc. associated with the
> current
> > step).
> >
> > There may be good reasons to make this a property of a DoFn that the
> runner
> > can inspect, and support. For instance, if a DoFn wants to process
> batches
> > of 50, it may be possible to factor that into how input is split/bundled.
>
> That's an interesting idea. I think this could also be done via the Fn
> API by recognizing the URN of "batching DoFn."
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
The third option for batching:

- Functionality within the DoFn and DoFnRunner built as part of the SDK.

I haven't thought through Batching, but at least for the
IntraBundleParallelization use case this actually does make sense to expose
as a part of the model. Knowing that a DoFn supports parallelization, a
runner may want to control how much parallelization is allowed, and the
DoFn also needs to make sure to wait on all those threads (and make sure
they're properly setup for logging/metrics/etc. associated with the current
step).

There may be good reasons to make this a property of a DoFn that the runner
can inspect, and support. For instance, if a DoFn wants to process batches
of 50, it may be possible to factor that into how input is split/bundled.

On Thu, Jan 26, 2017 at 3:49 PM Kenneth Knowles 
wrote:

> On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov <
> kirpic...@google.com.invalid> wrote:
>
> > The class for invoking DoFn's,
> > DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> > good reason.
> >
>
> This would be true if it weren't for that pesky DoFnTester :-)
>
> And even if we solve that problem, in the future it will be in the SDK's Fn
> Harness.
>


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-26 Thread Ben Chambers
I think that wrapping the DoFn is tricky -- we backed out
IntraBundleParallelization because it did that, and it has weird
interactions with both the reflective DoFn and windowing. We could maybe
make some kind of "DoFnDelegatingDoFn" that could act as a base class and
get some of that right, but...

One question I have is whether this batching should be "make batches of N
and if you need to wait for the Nth element do so" or "make batches of at
most N but don't wait too long if you don't get to N". In the former case,
we'll need to do something to buffer elements between bundles -- whether
this is using State or a GroupByKey, etc. In the latter case, the buffering
can happen entirely within a bundle -- if you get to the end of the bundle
and only have 5 elements, even if 5 < N, process that as a batch (rather
than shifting it somewhere else).

On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw 
wrote:

> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
>  wrote:
> > I don't think we should make batching a core feature of the Beam
> > programming model (by adding it to DoFn as this code snippet implies).
> I'm
> > reasonably sure there are less invasive ways of implementing it.
>
> +1, either as a PTransform, Pc> or a DoFn that
> wraps/delegates to a DoFn, Iterable>.
>
> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré 
> > wrote:
> >
> >> Agree, I'm curious as well.
> >>
> >> I guess it would be something like:
> >>
> >> .apply(ParDo(new DoFn() {
> >>
> >> @Override
> >> public long batchSize() {
> >>   return 1000;
> >> }
> >>
> >> @ProcessElement
> >> public void processElement(ProcessContext context) {
> >>   ...
> >> }
> >> }));
> >>
> >> If batchSize (overrided by user) returns a positive long, then DoFn can
> >> batch with this size.
> >>
> >> Regards
> >> JB
> >>
> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
> >> > Hi Etienne,
> >> >
> >> > Could you post some snippets of how your transform is to be used in a
> >> > pipeline? I think that would make it easier to discuss on this thread
> and
> >> > could save a lot of churn if the discussion ends up leading to a
> >> different
> >> > API.
> >> >
> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot  >
> >> > wrote:
> >> >
> >> >> Wonderful !
> >> >>
> >> >> Thanks Kenn !
> >> >>
> >> >> Etienne
> >> >>
> >> >>
> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
> >> >>> Hi Etienne,
> >> >>>
> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
> >> >>> arrived. I thought I would try to quickly unblock you by responding
> >> with
> >> >> a
> >> >>> TL;DR: you can achieve your goals with state & timers as they
> currently
> >> >>> exist. You'll set a timer for
> >> window.maxTimestamp().plus(allowedLateness)
> >> >>> precisely - when this timer fires, you are guaranteed that the input
> >> >>> watermark has exceeded this point (so all new data is droppable)
> while
> >> >> the
> >> >>> output timestamp is held to this point (so you can safely output
> into
> >> the
> >> >>> window).
> >> >>>
> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
> >> >> handle
> >> >>> on the allowed lateness (not a problem in your case) and (2)
> actually
> >> >>> meaningful and potentially less expensive to implement in the
> absence
> >> of
> >> >>> state (this is why it needs a design discussion at all, really).
> >> >>>
> >> >>> Caveat: these APIs are new and not supported in every runner and
> >> >> windowing
> >> >>> configuration.
> >> >>>
> >> >>> Kenn
> >> >>>
> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
> echauc...@gmail.com
> >> >
> >> >>> wrote:
> >> >>>
> >> >>>> Hi,
> >> >>>>
> >> >>>> I have started to implement this ticket. F

Re: Committed vs. attempted metrics results

2017-01-26 Thread Ben Chambers
It think relaxing the query to not be an exact match is reasonable. I'm
wondering if it should be substring or regex. either one preserves the
existing behavior of, when passed a full step path returning only the
metrics for that specific step, but it adds the ability to just know
approximately the step name.

A) Util or not seems fine to me. If it seems likely to be reusable let's do
so. Preferably in a seperate PR or commit.

B) Yes, direct runner should match whatever semantics we choose.

On Thu, Jan 26, 2017, 5:30 AM Aviem Zur  wrote:

> Ben - yes, there is still some ambiguity regarding the querying of the
> metrics results.
>
> You've discussed in this thread the notion that metrics step names should
> be converted to unique names when aggregating metrics, so that each step
> will aggregate its own metrics, and not join with other steps by mistake.
> The way you suggested to query this is that when querying by step, all
> steps which contain the query string as a substring will show up in the
> results.
>
> This sounds fine, however this is not how the direct runner is implemented
> right now. It is an exact match.
>
> In my PR I copied the MetricsResults filtering methods directly from the
> direct runner and suggested that since I copied them verbatim perhaps they
> should be pulled up to a more central module, and then all runners could
> use them.
>
> So, my remaining questions are:
> A) Should we create a util for filtering MetricsResults based on a query,
> using the suggested filtering implementation in this thread (Substring
> match) ?
> B) Should direct runner be changed to use such a util, and conform with the
> results filtering suggested.
>
> On Tue, Jan 24, 2017 at 2:03 AM Ben Chambers  >
> wrote:
>
> > For the short term, it seems like staying with the existing Query API and
> > allowing runner's to throw exceptions if a user issues a query that is
> not
> > supported is reasonable. It shouldn't affect the ability to run a Beam
> > pipeline on other runners, since the Query API is only exposed *after*
> the
> > pipeline is run.
> >
> > For the longer term, it seems like the Query API could merit some more
> > thought, especially if people have good use cases for accessing the value
> > of metrics programatically from the same program that ran the original
> > pipeline.
> >
> > Aviem -- is there anything specific that needs to be discussed/nailed
> down
> > to help with your PR?
> >
> > On Thu, Jan 19, 2017 at 3:57 PM Ben Chambers 
> wrote:
> >
> > > On Thu, Jan 19, 2017 at 3:28 PM Amit Sela 
> wrote:
> > >
> > > On Fri, Jan 20, 2017 at 1:18 AM Ben Chambers
> >  > > >
> > > wrote:
> > >
> > > > Part of the problem here is that whether attempted or committed is
> > "what
> > > > matters" depends on the metric. If you're counting the number of RPCs
> > to
> > > an
> > > > external service, you may want all the attempts (to predict your
> bill).
> > > If
> > > > you're tracking how long those RPCs took, you may want it just to be
> > the
> > > > committed (eg., what is the best-case time to execute your pipeline).
> > > This
> > > > is essentially Luke's case of wanting one or the other.
> > > >
> > > This sounds like Metrics should have a user-defined guarantee-level..
> > which
> > > might make more sense - Metrics.counter().attempted()/committed() -
> > though
> > > this might prove more challenging for runners to implement.
> > >
> > >
> > > We went away from that because of cases like Luke's where a user might
> > > want to compare the two. Or, not even realize there is a difference
> > > up-front, so declaring ahead of time is difficult. If both are
> available
> > > and can be looked at, if they're the same -- no problems. If they're
> > > different, then it provides a good reason to investigate and figure out
> > the
> > > difference.
> > >
> > >
> > > >
> > > > Regarding the step names -- the metrics are reported using the full
> > step
> > > > name, which is also made unique during the Graph API. So "My
> > > > Composite/ParDo(MyDoFn)" or "My Composite/ParDo(MyDoFn)2" if there
> are
> > > > multiple instances within the same composite. Specifically -- the
> names
> > > are
> > > > made unique prior to recording metrics, so there are no double
> counts.
> > > >
> &

Re: Committed vs. attempted metrics results

2017-01-23 Thread Ben Chambers
For the short term, it seems like staying with the existing Query API and
allowing runner's to throw exceptions if a user issues a query that is not
supported is reasonable. It shouldn't affect the ability to run a Beam
pipeline on other runners, since the Query API is only exposed *after* the
pipeline is run.

For the longer term, it seems like the Query API could merit some more
thought, especially if people have good use cases for accessing the value
of metrics programatically from the same program that ran the original
pipeline.

Aviem -- is there anything specific that needs to be discussed/nailed down
to help with your PR?

On Thu, Jan 19, 2017 at 3:57 PM Ben Chambers  wrote:

> On Thu, Jan 19, 2017 at 3:28 PM Amit Sela  wrote:
>
> On Fri, Jan 20, 2017 at 1:18 AM Ben Chambers  >
> wrote:
>
> > Part of the problem here is that whether attempted or committed is "what
> > matters" depends on the metric. If you're counting the number of RPCs to
> an
> > external service, you may want all the attempts (to predict your bill).
> If
> > you're tracking how long those RPCs took, you may want it just to be the
> > committed (eg., what is the best-case time to execute your pipeline).
> This
> > is essentially Luke's case of wanting one or the other.
> >
> This sounds like Metrics should have a user-defined guarantee-level.. which
> might make more sense - Metrics.counter().attempted()/committed() - though
> this might prove more challenging for runners to implement.
>
>
> We went away from that because of cases like Luke's where a user might
> want to compare the two. Or, not even realize there is a difference
> up-front, so declaring ahead of time is difficult. If both are available
> and can be looked at, if they're the same -- no problems. If they're
> different, then it provides a good reason to investigate and figure out the
> difference.
>
>
> >
> > Regarding the step names -- the metrics are reported using the full step
> > name, which is also made unique during the Graph API. So "My
> > Composite/ParDo(MyDoFn)" or "My Composite/ParDo(MyDoFn)2" if there are
> > multiple instances within the same composite. Specifically -- the names
> are
> > made unique prior to recording metrics, so there are no double counts.
> >
> But how would the user know that ? I'm afraid this could be confusing as a
> user-facing query API, and I think most users would simply name metrics
> differently.
>
>
> The query API can support querying based on a sub-string of the full name,
> and return metrics from all steps that match. That would allow the user to
> query metrics without knowing that. Having unique names for steps is
> important and useful for many other things (logging, associating time spent
> executing, etc.).
>
>
> >
> > On Thu, Jan 19, 2017 at 1:57 PM Amit Sela  wrote:
> >
> > > I think Luke's example is interesting, but I wonder how common it
> > is/would
> > > be ? I'd expect failures to happen but not in a rate that would be so
> > > dramatic that it'd be interesting to follow applicatively (I'd expect
> the
> > > runner/cluster to properly monitor up time of processes/nodes
> > separately).
> > > And even if it is useful, I can't think of other use cases.
> > >
> > > I thought the idea was to "declare" the Metrics guarantee level in the
> > > query API, but the more I think about it the more I tend to let it go
> for
> > > the following reasons:
> > >
> > >- Setting aside Luke's example, I think users would prefer the best
> > >guarantee a runner can provide. And on that note, I'd expect a
> > > "getMetrics"
> > >API and not have to figure-out guarantees.
> > >- Programmatic querying would "break"
> (UnsupportedOperationExecption)
> > >portability if a program that was running with a runner that
> supports
> > >committed() would try to execute on a runner that only supports
> > > attempted()
> > >- I know that portability is for the Pipeline and this is
> > post-execution
> > >but still, call it 25% portability issue ;-) .
> > >- According to the Capability Matrix, all runners fail to provide
> > >"commit" guarantee for Aggregators. I can only speak for Spark
> saying
> > > that
> > >supporting the Metrics API relies on the same underlying mechanism
> and
> > > so
> > >nothing will change. I wonder about other runners, anyone plans to
> > &g

Re: Committed vs. attempted metrics results

2017-01-19 Thread Ben Chambers
On Thu, Jan 19, 2017 at 3:28 PM Amit Sela  wrote:

> On Fri, Jan 20, 2017 at 1:18 AM Ben Chambers  >
> wrote:
>
> > Part of the problem here is that whether attempted or committed is "what
> > matters" depends on the metric. If you're counting the number of RPCs to
> an
> > external service, you may want all the attempts (to predict your bill).
> If
> > you're tracking how long those RPCs took, you may want it just to be the
> > committed (eg., what is the best-case time to execute your pipeline).
> This
> > is essentially Luke's case of wanting one or the other.
> >
> This sounds like Metrics should have a user-defined guarantee-level.. which
> might make more sense - Metrics.counter().attempted()/committed() - though
> this might prove more challenging for runners to implement.
>

We went away from that because of cases like Luke's where a user might want
to compare the two. Or, not even realize there is a difference up-front, so
declaring ahead of time is difficult. If both are available and can be
looked at, if they're the same -- no problems. If they're different, then
it provides a good reason to investigate and figure out the difference.


> >
> > Regarding the step names -- the metrics are reported using the full step
> > name, which is also made unique during the Graph API. So "My
> > Composite/ParDo(MyDoFn)" or "My Composite/ParDo(MyDoFn)2" if there are
> > multiple instances within the same composite. Specifically -- the names
> are
> > made unique prior to recording metrics, so there are no double counts.
> >
> But how would the user know that ? I'm afraid this could be confusing as a
> user-facing query API, and I think most users would simply name metrics
> differently.
>

The query API can support querying based on a sub-string of the full name,
and return metrics from all steps that match. That would allow the user to
query metrics without knowing that. Having unique names for steps is
important and useful for many other things (logging, associating time spent
executing, etc.).


> >
> > On Thu, Jan 19, 2017 at 1:57 PM Amit Sela  wrote:
> >
> > > I think Luke's example is interesting, but I wonder how common it
> > is/would
> > > be ? I'd expect failures to happen but not in a rate that would be so
> > > dramatic that it'd be interesting to follow applicatively (I'd expect
> the
> > > runner/cluster to properly monitor up time of processes/nodes
> > separately).
> > > And even if it is useful, I can't think of other use cases.
> > >
> > > I thought the idea was to "declare" the Metrics guarantee level in the
> > > query API, but the more I think about it the more I tend to let it go
> for
> > > the following reasons:
> > >
> > >- Setting aside Luke's example, I think users would prefer the best
> > >guarantee a runner can provide. And on that note, I'd expect a
> > > "getMetrics"
> > >API and not have to figure-out guarantees.
> > >- Programmatic querying would "break"
> (UnsupportedOperationExecption)
> > >portability if a program that was running with a runner that
> supports
> > >committed() would try to execute on a runner that only supports
> > > attempted()
> > >- I know that portability is for the Pipeline and this is
> > post-execution
> > >but still, call it 25% portability issue ;-) .
> > >- According to the Capability Matrix, all runners fail to provide
> > >"commit" guarantee for Aggregators. I can only speak for Spark
> saying
> > > that
> > >supporting the Metrics API relies on the same underlying mechanism
> and
> > > so
> > >nothing will change. I wonder about other runners, anyone plans to
> > > support
> > >"commit" guarantees for Metrics soon ? having said that, not sure
> this
> > > is a
> > >good reason not to have this as a placeholder.
> > >
> > > Another question for querying Metrics - querying by step could be a bit
> > > tricky since a runner is expected to keep unique naming/ids for steps,
> > but
> > > users are supposed to be aware of this here and I'd suspect users might
> > not
> > > follow and if they use the same ParDo in a couple of places they'll
> query
> > > it and it might be confusing for them to see "double counts" if they
> > didn't
> > > mean for that.
> > >
> > > Amit

Re: Committed vs. attempted metrics results

2017-01-19 Thread Ben Chambers
Part of the problem here is that whether attempted or committed is "what
matters" depends on the metric. If you're counting the number of RPCs to an
external service, you may want all the attempts (to predict your bill). If
you're tracking how long those RPCs took, you may want it just to be the
committed (eg., what is the best-case time to execute your pipeline). This
is essentially Luke's case of wanting one or the other.

Regarding the step names -- the metrics are reported using the full step
name, which is also made unique during the Graph API. So "My
Composite/ParDo(MyDoFn)" or "My Composite/ParDo(MyDoFn)2" if there are
multiple instances within the same composite. Specifically -- the names are
made unique prior to recording metrics, so there are no double counts.

On Thu, Jan 19, 2017 at 1:57 PM Amit Sela  wrote:

> I think Luke's example is interesting, but I wonder how common it is/would
> be ? I'd expect failures to happen but not in a rate that would be so
> dramatic that it'd be interesting to follow applicatively (I'd expect the
> runner/cluster to properly monitor up time of processes/nodes separately).
> And even if it is useful, I can't think of other use cases.
>
> I thought the idea was to "declare" the Metrics guarantee level in the
> query API, but the more I think about it the more I tend to let it go for
> the following reasons:
>
>- Setting aside Luke's example, I think users would prefer the best
>guarantee a runner can provide. And on that note, I'd expect a
> "getMetrics"
>API and not have to figure-out guarantees.
>- Programmatic querying would "break" (UnsupportedOperationExecption)
>portability if a program that was running with a runner that supports
>committed() would try to execute on a runner that only supports
> attempted()
>- I know that portability is for the Pipeline and this is post-execution
>but still, call it 25% portability issue ;-) .
>- According to the Capability Matrix, all runners fail to provide
>"commit" guarantee for Aggregators. I can only speak for Spark saying
> that
>supporting the Metrics API relies on the same underlying mechanism and
> so
>nothing will change. I wonder about other runners, anyone plans to
> support
>"commit" guarantees for Metrics soon ? having said that, not sure this
> is a
>good reason not to have this as a placeholder.
>
> Another question for querying Metrics - querying by step could be a bit
> tricky since a runner is expected to keep unique naming/ids for steps, but
> users are supposed to be aware of this here and I'd suspect users might not
> follow and if they use the same ParDo in a couple of places they'll query
> it and it might be confusing for them to see "double counts" if they didn't
> mean for that.
>
> Amit.
>
> On Thu, Jan 19, 2017 at 7:36 PM Ben Chambers  >
> wrote:
>
> > Thanks for starting the discussion! I'm going to hold off saying what I
> > think and instead just provide some background and additional questions,
> > because I want to see where the discussion goes.
> >
> > When I first suggested the API for querying metrics I was adding it for
> > parity with aggregators. A good first question might be does the pipeline
> > result even need query methods? Runners could add them as necessary based
> > on the levels of querying the support.
> >
> > The other desire was to make the accuracy clear. One implementation path
> > was reporting metrics directly from the workers while attempting work.
> This
> > can overcount when retrying and may be under the actual attempts if the
> > worker lost connectivity before reporting.
> >
> > Another implementation was something like a side output where the counts
> > are committed as part of each bundles results, and then aggregated. This
> > committed value is more accurate and represents the value that occurred
> > along the success path of the pipeline.
> >
> > I suspect there are other possible implementations so trying to make an
> API
> > that expresses all of them is difficult. So:
> >
> > 1. Does pipeline result need to support querying (which is useful for
> > programmatic consumption) or are metrics intended only to get values out
> of
> > a pipeline and into some metrics store?
> >
> > 2. How should pipeline results indicate the different kinds of metrics?
> > What if a runner supported multiple kinds (eg, the runner reports both
> > attempted and committed results)? As Luke mentions it may be useful to
> look
> > at both to unders

Re: Committed vs. attempted metrics results

2017-01-19 Thread Ben Chambers
Thanks for starting the discussion! I'm going to hold off saying what I
think and instead just provide some background and additional questions,
because I want to see where the discussion goes.

When I first suggested the API for querying metrics I was adding it for
parity with aggregators. A good first question might be does the pipeline
result even need query methods? Runners could add them as necessary based
on the levels of querying the support.

The other desire was to make the accuracy clear. One implementation path
was reporting metrics directly from the workers while attempting work. This
can overcount when retrying and may be under the actual attempts if the
worker lost connectivity before reporting.

Another implementation was something like a side output where the counts
are committed as part of each bundles results, and then aggregated. This
committed value is more accurate and represents the value that occurred
along the success path of the pipeline.

I suspect there are other possible implementations so trying to make an API
that expresses all of them is difficult. So:

1. Does pipeline result need to support querying (which is useful for
programmatic consumption) or are metrics intended only to get values out of
a pipeline and into some metrics store?

2. How should pipeline results indicate the different kinds of metrics?
What if a runner supported multiple kinds (eg, the runner reports both
attempted and committed results)? As Luke mentions it may be useful to look
at both to understand how much retries affected the value.
On Thu, Jan 19, 2017, 1:42 AM Aviem Zur  wrote:

Hi all,

While working on the implementation of metrics API in Spark runner the
question of committed vs. attempted results has come up, sparking (no pun
intended) an interesting conversation.
(See API: MetricResult
<
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricResult.java
>
and
discussion: PR #1750 )

The separation of `attempted` and `committed` metric results seems a bit
unclear.

Seeing that current implementations of aggregators in the different runners
do not guarantee correctness, one could assume that the metrics API
implementations will also follow the same guarantees.

If this is correct, then you could assume that only `attempted()` metrics
results can be fulfilled.
Would it then be better to just have a single method such as `get()` in the
API, and have the guarantees of each runner explained in the capability
matrix / documentation?


Re: [BEAM-135] Utilities for "batching" elements in a DoFn

2017-01-17 Thread Ben Chambers
We should start by understanding the goals. If elements are in different
windows can they be out in the same batch? If they have different
timestamps what timestamp should the batch have?

As a composite transform this will likely require a group by key which may
affect performance. Maybe within a dofn is better.

Then it could be some annotation or API that informs the runner. Should
batch sizes be fixed in the annotation (element count or size) or should
the user have some method that lets them decide when to process a batch
based on the contents?

Another thing to think about is whether this should be connected to the
ability to run parts of the bundle in parallel. Maybe each batch is an RPC
and you just want to start an async RPC for each batch. Then in addition to
start the final RPC in finishBundle, you also need to wait for all the RPCs
to complete.
On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot  wrote:

Hi JB,

I meant jira vote but discussion on the ML works also :)

As I understand the need (see stackoverflow links in jira ticket) the
aim is to avoid the user having to code the batching logic in his own
DoFn.processElement() and DoFn.finishBundle() regardless of the bundles.
For example, possible use case is to batch a call to an external service
(for performance).

I was thinking about providing a PTransform that implements the batching
in its own DoFn and that takes user defined functions for customization.

Etienne

Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
> Hi
>
> I guess you mean discussion on the mailing list about that, right ?
>
> AFAIR the ide⁣​a is to provide a utility class to deal with
pooling/batching. However not sure it's required as with @StartBundle etc
in DoFn and batching depends of the end user "logic".
>
> Regards
> JB
>
> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot 
wrote:
>> Hi all,
>>
>> I have started to work on this ticket
>> https://issues.apache.org/jira/browse/BEAM-135
>>
>> As there where no vote since March 18th, is the issue still
>> relevant/needed?
>>
>> Regards,
>>
>> Etienne


Re: @ProcessElement and private methods

2017-01-17 Thread Ben Chambers
We thought about it when this was added. We decided against it because
these are overrides-in-spirit. Letting them be private would be misleading
because they are called from outside the class and should be thought of in
that way.

Also, this seems similar to others in the Java ecosystem: JUnit test
methods are public as well.

What is the motivation for allowing private methods?


On Tue, Jan 17, 2017, 7:33 AM Stas Levin  wrote:

Hi,

At the moment only public methods are eligible to be decorated with
@ProcessElement (DoFnSignatures#findAnnotatedMethod).

Seems that from a technical point of view, it would be quite possible to
consider private methods as well. In fact, it's one of the benefits of
moving towards an annotation based discovery as opposed to an interface
based one.

Do you think being able to decorate non-public methods with @ProcessElement
(and similar annotations) is something we'd like to support?

Regards,
Stas


Re: Testing Metrics

2017-01-03 Thread Ben Chambers
The Metrics API in Beam is proposed to support both committed metrics (only
from the successfully committed bundles) and attempted metrics (from all
attempts at processing bundles). I think any mechanism based on the workers
reporting metrics to a monitoring system will only be able to support
attempted metrics. Reporting committed metrics needs runner support to make
sure that the metrics are reported (and acknowledged) atomically with the
completion of the bundle.

The way I have been thinking this would work is that the runner would be
involved in actually gathering the metrics, and then we could have some
mechanism that periodically retrieved the metrics from the runner (both
committed and attempted) and pushed those into the monitoring systems.

In this model, each runner should test that (1) the mechanism it has of
gathering metrics works and (2) that the metric reporting plugin runs
appropriately. It would not be necessary to test that a specific metric
reporting plugin works with a specific runner, since all the plugins should
be using the same API to get metrics from the runner.

The API that has been built so far supports (1) as well as exposing metrics
from the runner on the PipelineResult object. I'm currently working on
building support for that in the Dataflow runner.

On Mon, Jan 2, 2017 at 11:57 PM Amit Sela  wrote:

I think that in the spirit of Codahale/Dropwizard metrics-like API, the
question is do we want to have something like ScheduledReporter
<
http://metrics.dropwizard.io/3.1.0/apidocs/com/codahale/metrics/ScheduledReporter.html
>
as
a contract to collect and report the metrics to different monitoring
systems (e.g., Graphite, Ganglia, etc.).


On Mon, Jan 2, 2017 at 8:07 PM Stas Levin  wrote:

> I see.
>
> Just to make sure I get it right, in (2), by sinks I mean various metrics
> backends (e.g., Graphite). So it boils down to having integration tests as
> part of Beam (runners?) that beyond testing the SDK layer (i.e., asserting
> over pipeline.metrics()) and actually test the specific metrics backend
> (i.e., asserting over inMemoryGraphite.metrics()), right?
>
> On Mon, Jan 2, 2017 at 7:14 PM Davor Bonaci  wrote:
>
> > Sounds like we should do both, right?
> >
> > 1. Test the metrics API without accounting for the various sink types,
> i.e.
> > > against the SDK.
> > >
> >
> > Metrics API is a runner-independent SDK concept. I'd imagine we'd want
to
> > have runner-independent test that interact with the API, outside of any
> > specific transform implementation, execute them on all runners, and
query
> > the results. Goal: make sure Metrics work.
> >
> > 2. Have the sink types, or at least some of them, tested as part of
> > > integration tests, e.g., have an in-memory Graphite server to test
> > Graphite
> > > metrics and so on.
> > >
> >
> > This is valid too -- this is testing *usage* of Metrics API in the given
> > IO. If a source/sink, or a transform in general, is exposing a metric,
> that
> > metric should be tested in its own right as a part of the transform
> > implementation.
> >
>


Re: PCollection to PCollection Conversion

2016-12-29 Thread Ben Chambers
Dan's proposal to move forward with a simple (future-proofed) version of
the ToString transform and Javadoc, and add specific features via follow-up
PRs.

On Thu, Dec 29, 2016 at 3:53 PM Jesse Anderson 
wrote:

> @Ben which idea do you like?
>
> On Thu, Dec 29, 2016 at 3:20 PM Ben Chambers  >
> wrote:
>
> > I like that idea, with the caveat that we should probably come up with a
> > better name. Perhaps "ToString.elements()" and ToString.Elements or
> > something? Calling one the "default" and using "create" for it seems
> > moderately non-future proof.
> >
> > On Thu, Dec 29, 2016 at 3:17 PM Dan Halperin  >
> > wrote:
> >
> > > On Thu, Dec 29, 2016 at 2:10 PM, Jesse Anderson  >
> > > wrote:
> > >
> > > > I agree MapElements isn't hard to use. I think there is a demand for
> > this
> > > > built-in conversion.
> > > >
> > > > My thought on the formatter is that, worst case, we could do runtime
> > type
> > > > checking. It would be ugly and not as performant, but it should work.
> > As
> > > > we've said, we'd point them to MapElements for better code. We'd
> write
> > > the
> > > > JavaDoc accordingly.
> > > >
> > >
> > > I think it will be good to see these proposals in PR form. I would stay
> > far
> > > away from reflection and varargs if possible, but properly-typed bits
> of
> > > code (possibly exposed as SerializableFunctions in ToString?) would
> > > probably make sense.
> > >
> > > In the short-term, I can't find anyone arguing against a
> > ToString.create()
> > > that simply does input.toString().
> > >
> > > To get started, how about we ask Vikas to clean up the PR to be more
> > > future-proof for now? Aka make `ToString` itself not a PTransform,  but
> > > instead ToString.create() returns ToString.Default which is a private
> > class
> > > implementing what ToString is now (PTransform, wrapping
> > > MapElements).
> > >
> > > Then we can send PRs adding new features to that.
> > >
> > > IME and to Ben's point, these will mostly be used in development. Some
> of
> > > > our assumptions will break down when programmers aren't the ones
> using
> > > > Beam. I can see from the user traffic already that not everyone using
> > > Beam
> > > > is a programmer and they'll need classes like this to be productive.
> > >
> > >
> > > > On Thu, Dec 29, 2016 at 1:46 PM Dan Halperin
> >  > > >
> > > > wrote:
> > > >
> > > > On Thu, Dec 29, 2016 at 1:36 PM, Jesse Anderson <
> je...@smokinghand.com
> > >
> > > > wrote:
> > > >
> > > > > I prefer JB's take. I think there should be three overloaded
> methods
> > on
> > > > the
> > > > > class. I like Vikas' name ToString. The methods for a simple
> > conversion
> > > > > should be:
> > > > >
> > > > > ToString.strings() - Outputs the .toString() of the objects in the
> > > > > PCollection
> > > > > ToString.strings(String delimiter) - Outputs the .toString() of
> KVs,
> > > > Lists,
> > > > > etc with the delimiter between every entry
> > > > > ToString.formatted(String format) - Outputs the formatted
> > > > > <
> https://docs.oracle.com/javase/8/docs/api/java/util/Formatter.html>
> > > > > string
> > > > > with the object passed in. For objects made up of different parts
> > like
> > > > KVs,
> > > > > each one is passed in as separate toString() of a varargs.
> > > > >
> > > >
> > > > Riffing a little, with some types:
> > > >
> > > > ToString.of() -- PTransform that is equivalent to a
> ParDo
> > > > that takes in a T and outputs T.toString().
> > > >
> > > > ToString.kv(String delimiter) -- PTransform, String>
> that
> > > is
> > > > equivalent to a ParDo that takes in a KV and outputs
> > > > kv.getKey().toString() + delimiter + kv.getValue().toString()
> > > >
> > > > ToString.iterable(String delimiter) -- PTransform > > Iterable,
> > > > String> that is equivalent to a ParDo that takes in an Iterable
> and
> > > > outputs the iterable[0] + de

Re: PCollection to PCollection Conversion

2016-12-29 Thread Ben Chambers
I like that idea, with the caveat that we should probably come up with a
better name. Perhaps "ToString.elements()" and ToString.Elements or
something? Calling one the "default" and using "create" for it seems
moderately non-future proof.

On Thu, Dec 29, 2016 at 3:17 PM Dan Halperin 
wrote:

> On Thu, Dec 29, 2016 at 2:10 PM, Jesse Anderson 
> wrote:
>
> > I agree MapElements isn't hard to use. I think there is a demand for this
> > built-in conversion.
> >
> > My thought on the formatter is that, worst case, we could do runtime type
> > checking. It would be ugly and not as performant, but it should work. As
> > we've said, we'd point them to MapElements for better code. We'd write
> the
> > JavaDoc accordingly.
> >
>
> I think it will be good to see these proposals in PR form. I would stay far
> away from reflection and varargs if possible, but properly-typed bits of
> code (possibly exposed as SerializableFunctions in ToString?) would
> probably make sense.
>
> In the short-term, I can't find anyone arguing against a ToString.create()
> that simply does input.toString().
>
> To get started, how about we ask Vikas to clean up the PR to be more
> future-proof for now? Aka make `ToString` itself not a PTransform,  but
> instead ToString.create() returns ToString.Default which is a private class
> implementing what ToString is now (PTransform, wrapping
> MapElements).
>
> Then we can send PRs adding new features to that.
>
> IME and to Ben's point, these will mostly be used in development. Some of
> > our assumptions will break down when programmers aren't the ones using
> > Beam. I can see from the user traffic already that not everyone using
> Beam
> > is a programmer and they'll need classes like this to be productive.
>
>
> > On Thu, Dec 29, 2016 at 1:46 PM Dan Halperin  >
> > wrote:
> >
> > On Thu, Dec 29, 2016 at 1:36 PM, Jesse Anderson 
> > wrote:
> >
> > > I prefer JB's take. I think there should be three overloaded methods on
> > the
> > > class. I like Vikas' name ToString. The methods for a simple conversion
> > > should be:
> > >
> > > ToString.strings() - Outputs the .toString() of the objects in the
> > > PCollection
> > > ToString.strings(String delimiter) - Outputs the .toString() of KVs,
> > Lists,
> > > etc with the delimiter between every entry
> > > ToString.formatted(String format) - Outputs the formatted
> > > 
> > > string
> > > with the object passed in. For objects made up of different parts like
> > KVs,
> > > each one is passed in as separate toString() of a varargs.
> > >
> >
> > Riffing a little, with some types:
> >
> > ToString.of() -- PTransform that is equivalent to a ParDo
> > that takes in a T and outputs T.toString().
> >
> > ToString.kv(String delimiter) -- PTransform, String> that
> is
> > equivalent to a ParDo that takes in a KV and outputs
> > kv.getKey().toString() + delimiter + kv.getValue().toString()
> >
> > ToString.iterable(String delimiter) -- PTransform Iterable,
> > String> that is equivalent to a ParDo that takes in an Iterable and
> > outputs the iterable[0] + delimiter + iterable[1] + delimiter + ... +
> > delimiter + iterable[N-1]
> >
> > ToString.custom(SerializableFunction formatter) ?
> >
> > The last one is just MapElement.via, except you don't need to set the
> > output type.
> >
> > I don't see a way to make the generic .formatted() that you propose that
> > just works with anything "made of different parts".
> >
> > I think this adding too many overrides beyond "of" and "custom" is
> opening
> > up a Pandora's Box. the KV one might want to have left and right
> > delimiters, might want to take custom formatters for K and V, etc. etc.
> The
> > iterable one might want to have a special configuration for an empty
> > iterable. So I'm inclined towards simplicity with the awareness that
> > MapElements.via is just not that hard to use.
> >
> > Dan
> >
> >
> > >
> > > I think doing these three methods would cover every simple and advanced
> > > "simple conversions." As JB says, we'll need other specific converters
> > for
> > > other formats like XML.
> > >
> > > I'd really like to see this class in the next version of Beam. What
> does
> > > everyone think of the class name, methods name, and method operations
> so
> > we
> > > can have Vikas finish up?
> > >
> > > Thanks,
> > >
> > > Jesse
> > >
> > > On Wed, Dec 28, 2016 at 12:28 PM Jean-Baptiste Onofré  >
> > > wrote:
> > >
> > > > Hi Vikas,
> > > >
> > > > did you take a look on:
> > > >
> > > >
> > > > https://github.com/jbonofre/beam/tree/DATAFORMAT/sdks/
> > > java/extensions/dataformat
> > > >
> > > > You can see KV2String and ToString could be part of this extension.
> > > > I'm also using JAXB for XML and Jackson for JSON
> > > > marshalling/unmarshalling. I'm planning to deal with Avro
> > > (IndexedRecord).
> > > >
> > > > Regards
> > > > JB
> > > >
> > > > On 12/28/2016 08:37 PM, Vikas Kedigehalli wrote:
> > > > > Hi All,

Re: PCollection to PCollection Conversion

2016-12-29 Thread Ben Chambers
+1 to Dan's point that MapElements.via is not that hard to use and going
too far down this path leads to significant complexity.

Pipelines should generally prefer to deal with structured data as much as
possible. As has been discussed on this thread, though, sometimes it is
necessary to convert things to strings (either for writing to a sink or
debugging). In these cases, it seems like having a simple "ToString"
transform is useful.

But if we go beyond that, we open up a can of worms. There are many ways to
convert something to a string, and I don't think we can reasonably identify
all of them. It also makes usability harder, since the user has to dig
through our documentation and find the built-in transform offering the
functionality they want.

If we instead provide the most common case (simple toString transform) and
after that suggest using MapElements.via (maybe even in the Javadoc),
user's will be guided to a useful tool (MapElements.via) and then be able
to reuse all the normal Java tools such as String.format that they are
already familiar with.

On Thu, Dec 29, 2016 at 1:46 PM Dan Halperin 
wrote:

> On Thu, Dec 29, 2016 at 1:36 PM, Jesse Anderson 
> wrote:
>
> > I prefer JB's take. I think there should be three overloaded methods on
> the
> > class. I like Vikas' name ToString. The methods for a simple conversion
> > should be:
> >
> > ToString.strings() - Outputs the .toString() of the objects in the
> > PCollection
> > ToString.strings(String delimiter) - Outputs the .toString() of KVs,
> Lists,
> > etc with the delimiter between every entry
> > ToString.formatted(String format) - Outputs the formatted
> > 
> > string
> > with the object passed in. For objects made up of different parts like
> KVs,
> > each one is passed in as separate toString() of a varargs.
> >
>
> Riffing a little, with some types:
>
> ToString.of() -- PTransform that is equivalent to a ParDo
> that takes in a T and outputs T.toString().
>
> ToString.kv(String delimiter) -- PTransform, String> that is
> equivalent to a ParDo that takes in a KV and outputs
> kv.getKey().toString() + delimiter + kv.getValue().toString()
>
> ToString.iterable(String delimiter) -- PTransform,
> String> that is equivalent to a ParDo that takes in an Iterable and
> outputs the iterable[0] + delimiter + iterable[1] + delimiter + ... +
> delimiter + iterable[N-1]
>
> ToString.custom(SerializableFunction formatter) ?
>
> The last one is just MapElement.via, except you don't need to set the
> output type.
>
> I don't see a way to make the generic .formatted() that you propose that
> just works with anything "made of different parts".
>
> I think this adding too many overrides beyond "of" and "custom" is opening
> up a Pandora's Box. the KV one might want to have left and right
> delimiters, might want to take custom formatters for K and V, etc. etc. The
> iterable one might want to have a special configuration for an empty
> iterable. So I'm inclined towards simplicity with the awareness that
> MapElements.via is just not that hard to use.
>
> Dan
>
>
> >
> > I think doing these three methods would cover every simple and advanced
> > "simple conversions." As JB says, we'll need other specific converters
> for
> > other formats like XML.
> >
> > I'd really like to see this class in the next version of Beam. What does
> > everyone think of the class name, methods name, and method operations so
> we
> > can have Vikas finish up?
> >
> > Thanks,
> >
> > Jesse
> >
> > On Wed, Dec 28, 2016 at 12:28 PM Jean-Baptiste Onofré 
> > wrote:
> >
> > > Hi Vikas,
> > >
> > > did you take a look on:
> > >
> > >
> > > https://github.com/jbonofre/beam/tree/DATAFORMAT/sdks/
> > java/extensions/dataformat
> > >
> > > You can see KV2String and ToString could be part of this extension.
> > > I'm also using JAXB for XML and Jackson for JSON
> > > marshalling/unmarshalling. I'm planning to deal with Avro
> > (IndexedRecord).
> > >
> > > Regards
> > > JB
> > >
> > > On 12/28/2016 08:37 PM, Vikas Kedigehalli wrote:
> > > > Hi All,
> > > >
> > > >   Not being aware of the discussion here, I sent out a PR
> > > >  but JB and others
> directed
> > > me to
> > > > this thread. Having converted PCollection to PCollection
> > > several
> > > > times, I feel something like 'ToString' transform is common enough to
> > be
> > > > part of the core. What do you all think?
> > > >
> > > > Also, if someone else is already working on or interested in tackling
> > > this,
> > > > then I am happy to discard the PR.
> > > >
> > > > Regards,
> > > > Vikas
> > > >
> > > > On Tue, Dec 13, 2016 at 1:56 AM, Amit Sela 
> > wrote:
> > > >
> > > >> It seems that there were a lot of good points raised here, and I
> tend
> > to
> > > >> agree that something as trivial and lean as "ToString" should be a
> > part
> > > of
> > > >> core.ake
> > > >> I'm particularly fond of makeString(prefix, toS

Re: Creating Sum.[*]Fn instances

2016-12-22 Thread Ben Chambers
Don't they need to be visible for use with composed combine and combining
value state?

On Thu, Dec 22, 2016, 9:45 AM Lukasz Cwik  wrote:

> Those are used internally within Sum and its expected that users instead
> call Sum.integersPerKey, or Sum.doublesPerKey, or Sum.integersGlobally, or
> ...
> The Combine.java example specifically calls out using Sum.SumIntegerFn
> instead of calling Sum.integersPerKey.
>
> I filed https://issues.apache.org/jira/browse/BEAM-1208 to address the
> visibility of Sum.[*]Fn instances.
>
> On Thu, Dec 22, 2016 at 3:07 AM, Stas Levin  wrote:
>
> > Hi all,
> >
> > I was wondering if there was a reason Sum.SumDoubleFn, SumIntegerFn and
> > SumLongFn are not using the X.of() or X.from() or other instance creation
> > via static method patterns that are so common in Beam?
> >
> > For example:
> >
> > new Sum.SumLongFn()
> >
> > vs.
> >
> > SumFn.ofLong()
> >
> >
> > Regards,
> > Stas
> >
>